# Copyright 2015 The TensorFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """Tests for tensorflow.ops.data_flow_ops.PaddingFIFOQueue.""" from __future__ import absolute_import from __future__ import division from __future__ import print_function import random import time import numpy as np from six.moves import xrange # pylint: disable=redefined-builtin from tensorflow.python.framework import constant_op from tensorflow.python.framework import dtypes as dtypes_lib from tensorflow.python.framework import errors_impl from tensorflow.python.framework import ops from tensorflow.python.framework import tensor_shape from tensorflow.python.ops import array_ops from tensorflow.python.ops import data_flow_ops from tensorflow.python.platform import test class PaddingFIFOQueueTest(test.TestCase): def testConstructor(self): with ops.Graph().as_default(): q = data_flow_ops.PaddingFIFOQueue( 10, dtypes_lib.float32, ((None,),), name="Q") self.assertTrue(isinstance(q.queue_ref, ops.Tensor)) self.assertProtoEquals(""" name:'Q' op:'PaddingFIFOQueueV2' attr { key: 'component_types' value { list { type: DT_FLOAT } } } attr { key: 'shapes' value { list { shape { dim { size: -1 } } } } } attr { key: 'capacity' value { i: 10 } } attr { key: 'container' value { s: '' } } attr { key: 'shared_name' value { s: '' } } """, q.queue_ref.op.node_def) def testMultiQueueConstructor(self): with ops.Graph().as_default(): q = data_flow_ops.PaddingFIFOQueue( 5, (dtypes_lib.int32, dtypes_lib.float32), ((), ()), shared_name="foo", name="Q") self.assertTrue(isinstance(q.queue_ref, ops.Tensor)) self.assertProtoEquals(""" name:'Q' op:'PaddingFIFOQueueV2' attr { key: 'component_types' value { list { type: DT_INT32 type : DT_FLOAT } } } attr { key: 'shapes' value { list { shape { } shape { } } } } attr { key: 'capacity' value { i: 5 } } attr { key: 'container' value { s: '' } } attr { key: 'shared_name' value { s: 'foo' } } """, q.queue_ref.op.node_def) def testConstructorWithShapes(self): with ops.Graph().as_default(): q = data_flow_ops.PaddingFIFOQueue( 5, (dtypes_lib.int32, dtypes_lib.float32), shapes=(tensor_shape.TensorShape([1, 1, 2, 3]), tensor_shape.TensorShape([5, 8])), name="Q") self.assertTrue(isinstance(q.queue_ref, ops.Tensor)) self.assertProtoEquals(""" name:'Q' op:'PaddingFIFOQueueV2' attr { key: 'component_types' value { list { type: DT_INT32 type : DT_FLOAT } } } attr { key: 'shapes' value { list { shape { dim { size: 1 } dim { size: 1 } dim { size: 2 } dim { size: 3 } } shape { dim { size: 5 } dim { size: 8 } } } } } attr { key: 'capacity' value { i: 5 } } attr { key: 'container' value { s: '' } } attr { key: 'shared_name' value { s: '' } } """, q.queue_ref.op.node_def) def testEnqueue(self): with self.cached_session(): q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) enqueue_op = q.enqueue((10.0,)) enqueue_op.run() def testEnqueueWithShape(self): with self.cached_session(): q = data_flow_ops.PaddingFIFOQueue( 10, dtypes_lib.float32, shapes=((3, 2),)) enqueue_correct_op = q.enqueue(([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]],)) enqueue_correct_op.run() with self.assertRaises(ValueError): q.enqueue(([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]],)) self.assertEqual(1, q.size().eval()) def testEnqueueManyWithShape(self): with self.cached_session(): q = data_flow_ops.PaddingFIFOQueue( 10, [dtypes_lib.int32, dtypes_lib.int32], shapes=[(), (2,)]) q.enqueue_many([[1, 2, 3, 4], [[1, 1], [2, 2], [3, 3], [4, 4]]]).run() self.assertEqual(4, q.size().eval()) def testParallelEnqueue(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) elems = [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] enqueue_ops = [q.enqueue((x,)) for x in elems] dequeued_t = q.dequeue() # Run one producer thread for each element in elems. def enqueue(enqueue_op): sess.run(enqueue_op) threads = [ self.checkedThread( target=enqueue, args=(e,)) for e in enqueue_ops ] for thread in threads: thread.start() for thread in threads: thread.join() # Dequeue every element using a single thread. results = [] for _ in xrange(len(elems)): results.append(dequeued_t.eval()) self.assertItemsEqual(elems, results) def testParallelDequeue(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) elems = [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] enqueue_ops = [q.enqueue((x,)) for x in elems] dequeued_t = q.dequeue() # Enqueue every element using a single thread. for enqueue_op in enqueue_ops: enqueue_op.run() # Run one consumer thread for each element in elems. results = [] def dequeue(): results.append(sess.run(dequeued_t)) threads = [self.checkedThread(target=dequeue) for _ in enqueue_ops] for thread in threads: thread.start() for thread in threads: thread.join() self.assertItemsEqual(elems, results) def testDequeue(self): with self.cached_session(): q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) elems = [10.0, 20.0, 30.0] enqueue_ops = [q.enqueue((x,)) for x in elems] dequeued_t = q.dequeue() for enqueue_op in enqueue_ops: enqueue_op.run() for i in xrange(len(elems)): vals = dequeued_t.eval() self.assertEqual([elems[i]], vals) def testEnqueueAndBlockingDequeue(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(3, dtypes_lib.float32, ((),)) elems = [10.0, 20.0, 30.0] enqueue_ops = [q.enqueue((x,)) for x in elems] dequeued_t = q.dequeue() def enqueue(): # The enqueue_ops should run after the dequeue op has blocked. # TODO(mrry): Figure out how to do this without sleeping. time.sleep(0.1) for enqueue_op in enqueue_ops: sess.run(enqueue_op) results = [] def dequeue(): for _ in xrange(len(elems)): results.append(sess.run(dequeued_t)) enqueue_thread = self.checkedThread(target=enqueue) dequeue_thread = self.checkedThread(target=dequeue) enqueue_thread.start() dequeue_thread.start() enqueue_thread.join() dequeue_thread.join() for elem, result in zip(elems, results): self.assertEqual([elem], result) def testMultiEnqueueAndDequeue(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(10, (dtypes_lib.int32, dtypes_lib.float32), ((), ())) elems = [(5, 10.0), (10, 20.0), (15, 30.0)] enqueue_ops = [q.enqueue((x, y)) for x, y in elems] dequeued_t = q.dequeue() for enqueue_op in enqueue_ops: enqueue_op.run() for i in xrange(len(elems)): x_val, y_val = sess.run(dequeued_t) x, y = elems[i] self.assertEqual([x], x_val) self.assertEqual([y], y_val) def testQueueSizeEmpty(self): with self.cached_session(): q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) self.assertEqual([0], q.size().eval()) def testQueueSizeAfterEnqueueAndDequeue(self): with self.cached_session(): q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) enqueue_op = q.enqueue((10.0,)) dequeued_t = q.dequeue() size = q.size() self.assertEqual([], size.get_shape()) enqueue_op.run() self.assertEqual(1, size.eval()) dequeued_t.op.run() self.assertEqual(0, size.eval()) def testEnqueueMany(self): with self.cached_session(): q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) elems = [10.0, 20.0, 30.0, 40.0] enqueue_op = q.enqueue_many((elems,)) dequeued_t = q.dequeue() enqueue_op.run() enqueue_op.run() for i in range(8): vals = dequeued_t.eval() self.assertEqual([elems[i % 4]], vals) def testEmptyEnqueueMany(self): with self.cached_session(): q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ( (None, None),)) empty_t = constant_op.constant( [], dtype=dtypes_lib.float32, shape=[0, 2, 3]) enqueue_op = q.enqueue_many((empty_t,)) size_t = q.size() self.assertEqual([0], size_t.eval()) enqueue_op.run() self.assertEqual([0], size_t.eval()) def testEmptyDequeueMany(self): with self.cached_session(): q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, shapes=((),)) enqueue_op = q.enqueue((10.0,)) dequeued_t = q.dequeue_many(0) self.assertEqual([], dequeued_t.eval().tolist()) enqueue_op.run() self.assertEqual([], dequeued_t.eval().tolist()) def testEmptyDequeueManyWithDynamicShape(self): with self.cached_session(): q = data_flow_ops.PaddingFIFOQueue( 10, dtypes_lib.float32, shapes=((None,),)) enqueue_op = q.enqueue(([10.0],)) dequeued_t = q.dequeue_many(0) self.assertEqual([], dequeued_t.eval().tolist()) enqueue_op.run() self.assertEqual([], dequeued_t.eval().tolist()) def testEmptyDequeueUpToWithDynamicShape(self): with self.cached_session(): q = data_flow_ops.PaddingFIFOQueue( 10, dtypes_lib.float32, shapes=((None,),)) enqueue_op = q.enqueue(([10.0],)) dequeued_t = q.dequeue_up_to(0) self.assertEqual([], dequeued_t.eval().tolist()) enqueue_op.run() self.assertEqual([], dequeued_t.eval().tolist()) def testConstructPaddingFIFOQueueWithNoShape(self): with self.cached_session(): with self.assertRaisesRegexp( ValueError, r"When providing partial shapes, a list of shapes must be provided."): data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, None).queue_ref.eval() def testMultiEnqueueMany(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(10, (dtypes_lib.float32, dtypes_lib.int32), ((), (2,))) float_elems = [10.0, 20.0, 30.0, 40.0] int_elems = [[1, 2], [3, 4], [5, 6], [7, 8]] enqueue_op = q.enqueue_many((float_elems, int_elems)) dequeued_t = q.dequeue() enqueue_op.run() enqueue_op.run() for i in range(8): float_val, int_val = sess.run(dequeued_t) self.assertEqual(float_elems[i % 4], float_val) self.assertAllEqual(int_elems[i % 4], int_val) def testMultiEnqueueManyWithPartiallyKnownShapes(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue( 10, (dtypes_lib.float32, dtypes_lib.int32), shapes=((), (None,))) float_elems = [10.0, 20.0, 30.0, 40.0] int_elems = [[1, 2], [3, 4], [5, 6], [7, 8]] enqueue_op = q.enqueue_many((float_elems, int_elems)) dequeued_t = q.dequeue() enqueue_op.run() enqueue_op.run() for i in range(8): float_val, int_val = sess.run(dequeued_t) self.assertEqual(float_elems[i % 4], float_val) self.assertAllEqual(int_elems[i % 4], int_val) def testDequeueMany(self): with self.cached_session(): q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) elems = [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] enqueue_op = q.enqueue_many((elems,)) dequeued_t = q.dequeue_many(4) enqueue_op.run() self.assertAllEqual(elems[0:4], dequeued_t.eval()) self.assertAllEqual(elems[4:8], dequeued_t.eval()) def testDequeueUpToNoBlocking(self): with self.cached_session(): q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) elems = [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] enqueue_op = q.enqueue_many((elems,)) dequeued_t = q.dequeue_up_to(4) enqueue_op.run() self.assertAllEqual(elems[0:4], dequeued_t.eval()) self.assertAllEqual(elems[4:8], dequeued_t.eval()) def testMultiDequeueMany(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue( 10, (dtypes_lib.float32, dtypes_lib.int32), shapes=((), (2,))) float_elems = [ 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0 ] int_elems = [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10], [11, 12], [13, 14], [15, 16], [17, 18], [19, 20]] enqueue_op = q.enqueue_many((float_elems, int_elems)) dequeued_t = q.dequeue_many(4) dequeued_single_t = q.dequeue() enqueue_op.run() float_val, int_val = sess.run(dequeued_t) self.assertAllEqual(float_elems[0:4], float_val) self.assertAllEqual(int_elems[0:4], int_val) self.assertEqual(float_val.shape, dequeued_t[0].get_shape()) self.assertEqual(int_val.shape, dequeued_t[1].get_shape()) float_val, int_val = sess.run(dequeued_t) self.assertAllEqual(float_elems[4:8], float_val) self.assertAllEqual(int_elems[4:8], int_val) float_val, int_val = sess.run(dequeued_single_t) self.assertAllEqual(float_elems[8], float_val) self.assertAllEqual(int_elems[8], int_val) self.assertEqual(float_val.shape, dequeued_single_t[0].get_shape()) self.assertEqual(int_val.shape, dequeued_single_t[1].get_shape()) def testMultiDequeueManyWithPartiallyKnownShapes(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue( 10, (dtypes_lib.float32, dtypes_lib.int32), shapes=((), (None,))) float_elems = [ 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0 ] int_elems = [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10], [11, 12], [13, 14], [15, 16], [17, 18], [19, 20]] enqueue_op = q.enqueue_many((float_elems, int_elems)) dequeued_t = q.dequeue_many(4) dequeued_single_t = q.dequeue() enqueue_op.run() float_val, int_val = sess.run(dequeued_t) self.assertAllEqual(float_elems[0:4], float_val) self.assertAllEqual(int_elems[0:4], int_val) self.assertTrue( tensor_shape.TensorShape(float_val.shape).is_compatible_with( dequeued_t[0].get_shape())) self.assertTrue( tensor_shape.TensorShape(int_val.shape).is_compatible_with(dequeued_t[ 1].get_shape())) float_val, int_val = sess.run(dequeued_t) self.assertAllEqual(float_elems[4:8], float_val) self.assertAllEqual(int_elems[4:8], int_val) float_val, int_val = sess.run(dequeued_single_t) self.assertAllEqual(float_elems[8], float_val) self.assertAllEqual(int_elems[8], int_val) self.assertTrue( tensor_shape.TensorShape(float_val.shape).is_compatible_with( dequeued_single_t[0].get_shape())) self.assertTrue( tensor_shape.TensorShape(int_val.shape).is_compatible_with( dequeued_single_t[1].get_shape())) def testMultiDequeueManyWithPartiallyKnownShapesAndVariableSizeInput(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue( 10, (dtypes_lib.string, dtypes_lib.int32), shapes=((None,), (1, None))) str_elems = [["a"], ["ab"], ["abc"], ["abc", "d"], ["abc", "d", "e"], ["abc", "d", "e", "f"]] int_elems = [[[1]], [[2]], [[3]], [[1, 2]], [[1, 2, 3]], [[1, 2, 3, 4]]] enqueue_ops = [q.enqueue((str_elems[i], int_elems[i])) for i in range(6)] dequeued_t = q.dequeue_many(5) dequeued_single_t = q.dequeue() for enqueue_op in enqueue_ops: enqueue_op.run() string_val, int_val = sess.run(dequeued_t) self.assertAllEqual([[b"a", b"", b""], [b"ab", b"", b""], [b"abc", b"", b""], [b"abc", b"d", b""], [b"abc", b"d", b"e"]], string_val) self.assertAllEqual([[[1, 0, 0]], [[2, 0, 0]], [[3, 0, 0]], [[1, 2, 0]], [[1, 2, 3]]], int_val) self.assertTrue( tensor_shape.TensorShape(string_val.shape).is_compatible_with( dequeued_t[0].get_shape())) self.assertTrue( tensor_shape.TensorShape(int_val.shape).is_compatible_with(dequeued_t[ 1].get_shape())) string_val, int_val = sess.run(dequeued_single_t) self.assertAllEqual([b"abc", b"d", b"e", b"f"], string_val) self.assertAllEqual([[1, 2, 3, 4]], int_val) self.assertTrue( tensor_shape.TensorShape(string_val.shape).is_compatible_with( dequeued_single_t[0].get_shape())) self.assertTrue( tensor_shape.TensorShape(int_val.shape).is_compatible_with( dequeued_single_t[1].get_shape())) def testMultiDequeueUpToPartiallyKnownShapesAndVariableInputNoBlocking(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue( 10, (dtypes_lib.string, dtypes_lib.int32), shapes=((None,), (1, None))) str_elems = [["a"], ["ab"], ["abc"], ["abc", "d"], ["abc", "d", "e"], ["abc", "d", "e", "f"]] int_elems = [[[1]], [[2]], [[3]], [[1, 2]], [[1, 2, 3]], [[1, 2, 3, 4]]] enqueue_ops = [q.enqueue((str_elems[i], int_elems[i])) for i in range(6)] dequeued_t = q.dequeue_up_to(5) dequeued_single_t = q.dequeue() for enqueue_op in enqueue_ops: enqueue_op.run() string_val, int_val = sess.run(dequeued_t) self.assertAllEqual([[b"a", b"", b""], [b"ab", b"", b""], [b"abc", b"", b""], [b"abc", b"d", b""], [b"abc", b"d", b"e"]], string_val) self.assertAllEqual([[[1, 0, 0]], [[2, 0, 0]], [[3, 0, 0]], [[1, 2, 0]], [[1, 2, 3]]], int_val) self.assertTrue( tensor_shape.TensorShape(string_val.shape).is_compatible_with( dequeued_t[0].get_shape())) self.assertTrue( tensor_shape.TensorShape(int_val.shape).is_compatible_with(dequeued_t[ 1].get_shape())) string_val, int_val = sess.run(dequeued_single_t) self.assertAllEqual([b"abc", b"d", b"e", b"f"], string_val) self.assertAllEqual([[1, 2, 3, 4]], int_val) self.assertTrue( tensor_shape.TensorShape(string_val.shape).is_compatible_with( dequeued_single_t[0].get_shape())) self.assertTrue( tensor_shape.TensorShape(int_val.shape).is_compatible_with( dequeued_single_t[1].get_shape())) def testHighDimension(self): with self.cached_session(): q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.int32, ((4, 4, 4, 4),)) elems = np.array([[[[[x] * 4] * 4] * 4] * 4 for x in range(10)], np.int32) enqueue_op = q.enqueue_many((elems,)) dequeued_t = q.dequeue_many(10) enqueue_op.run() self.assertAllEqual(dequeued_t.eval(), elems) def testPartiallyKnownHighDimension(self): with self.cached_session(): q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.int32, ( (4, None, 4, None),)) elems = np.array([[[[[x] * 4] * 4] * 4] * 4 for x in range(10)], np.int32) enqueue_op = q.enqueue_many((elems,)) dequeued_t = q.dequeue_many(10) enqueue_op.run() self.assertAllEqual(dequeued_t.eval(), elems) def testEnqueueWrongShape(self): q = data_flow_ops.PaddingFIFOQueue(10, (dtypes_lib.int32, dtypes_lib.int32), ((), (2,))) with self.assertRaises(ValueError): q.enqueue(([1, 2], [2, 2])) with self.assertRaises(ValueError): q.enqueue_many((7, [[1, 2], [3, 4], [5, 6]])) def testBatchSizeMismatch(self): q = data_flow_ops.PaddingFIFOQueue(10, (dtypes_lib.int32, dtypes_lib.int32, dtypes_lib.int32), ((), (), ())) with self.assertRaises(ValueError): q.enqueue_many(([1, 2, 3], [1, 2], [1, 2, 3])) with self.assertRaises(ValueError): q.enqueue_many( ([1, 2, 3], [1, 2], array_ops.placeholder(dtypes_lib.int32))) with self.assertRaises(ValueError): q.enqueue_many( (array_ops.placeholder(dtypes_lib.int32), [1, 2], [1, 2, 3])) def testEnqueueManyEmptyTypeConversion(self): q = data_flow_ops.PaddingFIFOQueue(10, (dtypes_lib.int32, dtypes_lib.float32), ( (), ())) enq = q.enqueue_many(([], [])) self.assertEqual(dtypes_lib.int32, enq.inputs[1].dtype) self.assertEqual(dtypes_lib.float32, enq.inputs[2].dtype) def testEnqueueWrongType(self): q = data_flow_ops.PaddingFIFOQueue(10, (dtypes_lib.int32, dtypes_lib.float32), ( (), ())) with self.assertRaises(ValueError): q.enqueue((array_ops.placeholder(dtypes_lib.int32), array_ops.placeholder(dtypes_lib.int32))) with self.assertRaises(ValueError): q.enqueue_many((array_ops.placeholder(dtypes_lib.int32), array_ops.placeholder(dtypes_lib.int32))) def testEnqueueWrongPartiallyKnownShapeAtRuntime(self): with self.cached_session() as sess: # First dimension of second component is unknown, second # dimension must be 3. q = data_flow_ops.PaddingFIFOQueue(10, (dtypes_lib.int32, dtypes_lib.int32), ( (2, 2), (None, 3))) elems_ok = np.array([1] * 4).reshape((2, 2)).astype(np.int32) elems_bad = array_ops.placeholder(dtypes_lib.int32) enqueue_op = q.enqueue((elems_ok, elems_bad)) with self.assertRaisesRegexp(errors_impl.InvalidArgumentError, r"Expected \[\?,3\], got \[3,4\]"): sess.run([enqueue_op], feed_dict={elems_bad: np.array([1] * 12).reshape((3, 4))}) def testEnqueueDequeueManyWrongPartiallyKnownShape(self): with self.cached_session() as sess: # First dimension of second component is unknown, second # dimension must be 3. q = data_flow_ops.PaddingFIFOQueue(10, (dtypes_lib.int32, dtypes_lib.int32), ( (2, 2), (None, 3))) elems_ok = np.array([1] * 8).reshape((2, 2, 2)).astype(np.int32) elems_bad = array_ops.placeholder(dtypes_lib.int32) enqueue_op = q.enqueue_many((elems_ok, elems_bad)) dequeued_t = q.dequeue_many(2) with self.assertRaisesRegexp(errors_impl.InvalidArgumentError, "Shape mismatch in tuple component 1. " r"Expected \[2,\?,3\], got \[2,3,4\]"): sess.run([enqueue_op], feed_dict={elems_bad: np.array([1] * 24).reshape((2, 3, 4))}) dequeued_t.eval() def testParallelEnqueueMany(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(1000, dtypes_lib.float32, shapes=((),)) elems = [10.0 * x for x in range(100)] enqueue_op = q.enqueue_many((elems,)) dequeued_t = q.dequeue_many(1000) # Enqueue 100 items in parallel on 10 threads. def enqueue(): sess.run(enqueue_op) threads = [self.checkedThread(target=enqueue) for _ in range(10)] for thread in threads: thread.start() for thread in threads: thread.join() self.assertItemsEqual(dequeued_t.eval(), elems * 10) def testParallelDequeueMany(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(1000, dtypes_lib.float32, shapes=((),)) elems = [10.0 * x for x in range(1000)] enqueue_op = q.enqueue_many((elems,)) dequeued_t = q.dequeue_many(100) enqueue_op.run() # Dequeue 100 items in parallel on 10 threads. dequeued_elems = [] def dequeue(): dequeued_elems.extend(sess.run(dequeued_t)) threads = [self.checkedThread(target=dequeue) for _ in range(10)] for thread in threads: thread.start() for thread in threads: thread.join() self.assertItemsEqual(elems, dequeued_elems) def testParallelDequeueUpTo(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(1000, dtypes_lib.float32, shapes=((),)) elems = [10.0 * x for x in range(1000)] enqueue_op = q.enqueue_many((elems,)) close_op = q.close() dequeued_t = q.dequeue_up_to(101) enqueue_op.run() close_op.run() # Dequeue up to 101 items in parallel on 10 threads, from closed queue. dequeued_elems = [] def dequeue(): dequeued_elems.extend(sess.run(dequeued_t)) threads = [self.checkedThread(target=dequeue) for _ in range(10)] for thread in threads: thread.start() for thread in threads: thread.join() self.assertItemsEqual(elems, dequeued_elems) def testParallelEnqueueAndDequeue(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(50, dtypes_lib.float32, shapes=((),)) initial_elements = [10.0] * 49 q.enqueue_many((initial_elements,)).run() enqueue_op = q.enqueue((20.0,)) dequeued_t = q.dequeue() def enqueue(): for _ in xrange(100): sess.run(enqueue_op) def dequeue(): for _ in xrange(100): self.assertTrue(sess.run(dequeued_t) in (10.0, 20.0)) enqueue_threads = [self.checkedThread(target=enqueue) for _ in range(10)] dequeue_threads = [self.checkedThread(target=dequeue) for _ in range(10)] for enqueue_thread in enqueue_threads: enqueue_thread.start() for dequeue_thread in dequeue_threads: dequeue_thread.start() for enqueue_thread in enqueue_threads: enqueue_thread.join() for dequeue_thread in dequeue_threads: dequeue_thread.join() # Dequeue the initial count of elements to clean up. cleanup_elems = q.dequeue_many(49).eval() for elem in cleanup_elems: self.assertTrue(elem in (10.0, 20.0)) def testMixtureOfEnqueueAndEnqueueMany(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.int32, shapes=((),)) enqueue_placeholder = array_ops.placeholder(dtypes_lib.int32, shape=()) enqueue_op = q.enqueue((enqueue_placeholder,)) enqueuemany_placeholder = array_ops.placeholder( dtypes_lib.int32, shape=(None,)) enqueuemany_op = q.enqueue_many((enqueuemany_placeholder,)) dequeued_t = q.dequeue() close_op = q.close() def dequeue(): for i in xrange(250): self.assertEqual(i, sess.run(dequeued_t)) dequeue_thread = self.checkedThread(target=dequeue) dequeue_thread.start() elements_enqueued = 0 while elements_enqueued < 250: # With equal probability, run Enqueue or enqueue_many. if random.random() > 0.5: enqueue_op.run({enqueue_placeholder: elements_enqueued}) elements_enqueued += 1 else: count = random.randint(0, min(20, 250 - elements_enqueued)) range_to_enqueue = np.arange( elements_enqueued, elements_enqueued + count, dtype=np.int32) enqueuemany_op.run({enqueuemany_placeholder: range_to_enqueue}) elements_enqueued += count close_op.run() dequeue_thread.join() self.assertEqual(0, q.size().eval()) def testMixtureOfDequeueAndDequeueMany(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.int32, shapes=((),)) enqueue_op = q.enqueue_many((np.arange(250, dtype=np.int32),)) dequeued_t = q.dequeue() count_placeholder = array_ops.placeholder(dtypes_lib.int32, shape=()) dequeuemany_t = q.dequeue_many(count_placeholder) def enqueue(): sess.run(enqueue_op) enqueue_thread = self.checkedThread(target=enqueue) enqueue_thread.start() elements_dequeued = 0 while elements_dequeued < 250: # With equal probability, run Dequeue or dequeue_many. if random.random() > 0.5: self.assertEqual(elements_dequeued, dequeued_t.eval()) elements_dequeued += 1 else: count = random.randint(0, min(20, 250 - elements_dequeued)) expected_range = np.arange( elements_dequeued, elements_dequeued + count, dtype=np.int32) self.assertAllEqual(expected_range, dequeuemany_t.eval({ count_placeholder: count })) elements_dequeued += count q.close().run() enqueue_thread.join() self.assertEqual(0, q.size().eval()) def testBlockingDequeueMany(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) elems = [10.0, 20.0, 30.0, 40.0] enqueue_op = q.enqueue_many((elems,)) dequeued_t = q.dequeue_many(4) dequeued_elems = [] def enqueue(): # The enqueue_op should run after the dequeue op has blocked. # TODO(mrry): Figure out how to do this without sleeping. time.sleep(0.1) sess.run(enqueue_op) def dequeue(): dequeued_elems.extend(sess.run(dequeued_t).tolist()) enqueue_thread = self.checkedThread(target=enqueue) dequeue_thread = self.checkedThread(target=dequeue) enqueue_thread.start() dequeue_thread.start() enqueue_thread.join() dequeue_thread.join() self.assertAllEqual(elems, dequeued_elems) def testBlockingDequeueUpTo(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) elems = [10.0, 20.0, 30.0, 40.0] enqueue_op = q.enqueue_many((elems,)) dequeued_t = q.dequeue_up_to(4) dequeued_elems = [] def enqueue(): # The enqueue_op should run after the dequeue op has blocked. # TODO(mrry): Figure out how to do this without sleeping. time.sleep(0.1) sess.run(enqueue_op) def dequeue(): dequeued_elems.extend(sess.run(dequeued_t).tolist()) enqueue_thread = self.checkedThread(target=enqueue) dequeue_thread = self.checkedThread(target=dequeue) enqueue_thread.start() dequeue_thread.start() enqueue_thread.join() dequeue_thread.join() self.assertAllEqual(elems, dequeued_elems) def testDequeueManyWithTensorParameter(self): with self.cached_session(): # Define a first queue that contains integer counts. dequeue_counts = [random.randint(1, 10) for _ in range(100)] count_q = data_flow_ops.PaddingFIFOQueue(100, dtypes_lib.int32, ((),)) enqueue_counts_op = count_q.enqueue_many((dequeue_counts,)) total_count = sum(dequeue_counts) # Define a second queue that contains total_count elements. elems = [random.randint(0, 100) for _ in range(total_count)] q = data_flow_ops.PaddingFIFOQueue(total_count, dtypes_lib.int32, ((),)) enqueue_elems_op = q.enqueue_many((elems,)) # Define a subgraph that first dequeues a count, then DequeuesMany # that number of elements. dequeued_t = q.dequeue_many(count_q.dequeue()) enqueue_counts_op.run() enqueue_elems_op.run() dequeued_elems = [] for _ in dequeue_counts: dequeued_elems.extend(dequeued_t.eval()) self.assertEqual(elems, dequeued_elems) def testDequeueFromClosedQueue(self): with self.cached_session(): q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) elems = [10.0, 20.0, 30.0, 40.0] enqueue_op = q.enqueue_many((elems,)) close_op = q.close() dequeued_t = q.dequeue() enqueue_op.run() close_op.run() for elem in elems: self.assertEqual([elem], dequeued_t.eval()) # Expect the operation to fail due to the queue being closed. with self.assertRaisesRegexp(errors_impl.OutOfRangeError, "is closed and has insufficient"): dequeued_t.eval() def testBlockingDequeueFromClosedQueue(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) elems = [10.0, 20.0, 30.0, 40.0] enqueue_op = q.enqueue_many((elems,)) close_op = q.close() dequeued_t = q.dequeue() enqueue_op.run() def dequeue(): for elem in elems: self.assertEqual([elem], sess.run(dequeued_t)) # Expect the operation to fail due to the queue being closed. with self.assertRaisesRegexp(errors_impl.OutOfRangeError, "is closed and has insufficient"): sess.run(dequeued_t) dequeue_thread = self.checkedThread(target=dequeue) dequeue_thread.start() # The close_op should run after the dequeue_thread has blocked. # TODO(mrry): Figure out how to do this without sleeping. time.sleep(0.1) close_op.run() dequeue_thread.join() def testDequeueUpToFromClosedQueueReturnsRemainder(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) elems = [10.0, 20.0, 30.0, 40.0] enqueue_op = q.enqueue_many((elems,)) close_op = q.close() dequeued_t = q.dequeue_up_to(3) enqueue_op.run() def dequeue(): self.assertAllEqual(elems[:3], sess.run(dequeued_t)) self.assertAllEqual(elems[3:], sess.run(dequeued_t)) dequeue_thread = self.checkedThread(target=dequeue) dequeue_thread.start() # The close_op should run after the dequeue_thread has blocked. # TODO(mrry): Figure out how to do this without sleeping. time.sleep(0.1) close_op.run() dequeue_thread.join() def testBlockingDequeueFromClosedEmptyQueue(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) close_op = q.close() dequeued_t = q.dequeue() def dequeue(): # Expect the operation to fail due to the queue being closed. with self.assertRaisesRegexp(errors_impl.OutOfRangeError, "is closed and has insufficient"): sess.run(dequeued_t) dequeue_thread = self.checkedThread(target=dequeue) dequeue_thread.start() # The close_op should run after the dequeue_thread has blocked. # TODO(mrry): Figure out how to do this without sleeping. time.sleep(0.1) close_op.run() dequeue_thread.join() def testBlockingDequeueManyFromClosedQueue(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) elems = [10.0, 20.0, 30.0, 40.0] enqueue_op = q.enqueue_many((elems,)) close_op = q.close() dequeued_t = q.dequeue_many(4) enqueue_op.run() def dequeue(): self.assertAllEqual(elems, sess.run(dequeued_t)) # Expect the operation to fail due to the queue being closed. with self.assertRaisesRegexp(errors_impl.OutOfRangeError, "is closed and has insufficient"): sess.run(dequeued_t) dequeue_thread = self.checkedThread(target=dequeue) dequeue_thread.start() # The close_op should run after the dequeue_thread has blocked. # TODO(mrry): Figure out how to do this without sleeping. time.sleep(0.1) close_op.run() dequeue_thread.join() def testBlockingDequeueManyButNotAllFromClosedQueue(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) elems = [10.0, 20.0, 30.0, 40.0] enqueue_op = q.enqueue_many((elems,)) close_op = q.close() dequeued_t = q.dequeue_many(3) enqueue_op.run() def dequeue(): self.assertAllEqual(elems[:3], sess.run(dequeued_t)) # Expect the operation to fail due to the queue being closed. with self.assertRaisesRegexp(errors_impl.OutOfRangeError, "is closed and has insufficient"): sess.run(dequeued_t) dequeue_thread = self.checkedThread(target=dequeue) dequeue_thread.start() # The close_op should run after the dequeue_thread has blocked. # TODO(mrry): Figure out how to do this without sleeping. time.sleep(0.1) close_op.run() dequeue_thread.join() def testEnqueueManyLargerThanCapacityWithConcurrentDequeueMany(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(4, dtypes_lib.float32, ((),)) elems = [10.0, 20.0, 30.0, 40.0] enqueue_op = q.enqueue_many((elems,)) close_op = q.close() dequeued_t = q.dequeue_many(3) cleanup_dequeue_t = q.dequeue() def enqueue(): sess.run(enqueue_op) def dequeue(): self.assertAllEqual(elems[0:3], sess.run(dequeued_t)) with self.assertRaises(errors_impl.OutOfRangeError): sess.run(dequeued_t) self.assertEqual(elems[3], sess.run(cleanup_dequeue_t)) def close(): sess.run(close_op) enqueue_thread = self.checkedThread(target=enqueue) enqueue_thread.start() dequeue_thread = self.checkedThread(target=dequeue) dequeue_thread.start() # The close_op should run after the dequeue_thread has blocked. # TODO(mrry): Figure out how to do this without sleeping. time.sleep(0.1) close_thread = self.checkedThread(target=close) close_thread.start() enqueue_thread.join() dequeue_thread.join() close_thread.join() def testClosedBlockingDequeueManyRestoresPartialBatch(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(4, (dtypes_lib.float32, dtypes_lib.float32), ((), ())) elems_a = [1.0, 2.0, 3.0] elems_b = [10.0, 20.0, 30.0] enqueue_op = q.enqueue_many((elems_a, elems_b)) dequeued_a_t, dequeued_b_t = q.dequeue_many(4) cleanup_dequeue_a_t, cleanup_dequeue_b_t = q.dequeue() close_op = q.close() enqueue_op.run() def dequeue(): with self.assertRaises(errors_impl.OutOfRangeError): sess.run([dequeued_a_t, dequeued_b_t]) dequeue_thread = self.checkedThread(target=dequeue) dequeue_thread.start() # The close_op should run after the dequeue_thread has blocked. # TODO(mrry): Figure out how to do this without sleeping. time.sleep(0.1) close_op.run() dequeue_thread.join() # Test that the elements in the partially-dequeued batch are # restored in the correct order. for elem_a, elem_b in zip(elems_a, elems_b): val_a, val_b = sess.run([cleanup_dequeue_a_t, cleanup_dequeue_b_t]) self.assertEqual(elem_a, val_a) self.assertEqual(elem_b, val_b) self.assertEqual(0, q.size().eval()) def testBlockingDequeueManyFromClosedEmptyQueue(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) close_op = q.close() dequeued_t = q.dequeue_many(4) def dequeue(): # Expect the operation to fail due to the queue being closed. with self.assertRaisesRegexp(errors_impl.OutOfRangeError, "is closed and has insufficient"): sess.run(dequeued_t) dequeue_thread = self.checkedThread(target=dequeue) dequeue_thread.start() # The close_op should run after the dequeue_thread has blocked. # TODO(mrry): Figure out how to do this without sleeping. time.sleep(0.1) close_op.run() dequeue_thread.join() def testBlockingDequeueUpToFromClosedEmptyQueue(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) close_op = q.close() dequeued_t = q.dequeue_up_to(4) def dequeue(): # Expect the operation to fail due to the queue being closed. with self.assertRaisesRegexp(errors_impl.OutOfRangeError, "is closed and has insufficient"): sess.run(dequeued_t) dequeue_thread = self.checkedThread(target=dequeue) dequeue_thread.start() # The close_op should run after the dequeue_thread has blocked. # TODO(mrry): Figure out how to do this without sleeping. time.sleep(0.1) close_op.run() dequeue_thread.join() def testEnqueueToClosedQueue(self): with self.cached_session(): q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) enqueue_op = q.enqueue((10.0,)) close_op = q.close() enqueue_op.run() close_op.run() # Expect the operation to fail due to the queue being closed. with self.assertRaisesRegexp(errors_impl.CancelledError, "is closed"): enqueue_op.run() def testEnqueueManyToClosedQueue(self): with self.cached_session(): q = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) elems = [10.0, 20.0, 30.0, 40.0] enqueue_op = q.enqueue_many((elems,)) close_op = q.close() enqueue_op.run() close_op.run() # Expect the operation to fail due to the queue being closed. with self.assertRaisesRegexp(errors_impl.CancelledError, "is closed"): enqueue_op.run() def testBlockingEnqueueToFullQueue(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(4, dtypes_lib.float32, ((),)) elems = [10.0, 20.0, 30.0, 40.0] enqueue_op = q.enqueue_many((elems,)) blocking_enqueue_op = q.enqueue((50.0,)) dequeued_t = q.dequeue() enqueue_op.run() def blocking_enqueue(): sess.run(blocking_enqueue_op) thread = self.checkedThread(target=blocking_enqueue) thread.start() # The dequeue ops should run after the blocking_enqueue_op has blocked. # TODO(mrry): Figure out how to do this without sleeping. time.sleep(0.1) for elem in elems: self.assertEqual([elem], dequeued_t.eval()) self.assertEqual([50.0], dequeued_t.eval()) thread.join() def testBlockingEnqueueManyToFullQueue(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(4, dtypes_lib.float32, ((),)) elems = [10.0, 20.0, 30.0, 40.0] enqueue_op = q.enqueue_many((elems,)) blocking_enqueue_op = q.enqueue_many(([50.0, 60.0],)) dequeued_t = q.dequeue() enqueue_op.run() def blocking_enqueue(): sess.run(blocking_enqueue_op) thread = self.checkedThread(target=blocking_enqueue) thread.start() # The dequeue ops should run after the blocking_enqueue_op has blocked. # TODO(mrry): Figure out how to do this without sleeping. time.sleep(0.1) for elem in elems: self.assertEqual([elem], dequeued_t.eval()) time.sleep(0.01) self.assertEqual([50.0], dequeued_t.eval()) self.assertEqual([60.0], dequeued_t.eval()) # Make sure the thread finishes before exiting. thread.join() def testBlockingEnqueueBeforeClose(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(4, dtypes_lib.float32, ((),)) elems = [10.0, 20.0, 30.0, 40.0] enqueue_op = q.enqueue_many((elems,)) blocking_enqueue_op = q.enqueue((50.0,)) close_op = q.close() dequeued_t = q.dequeue() enqueue_op.run() def blocking_enqueue(): # Expect the operation to succeed once the dequeue op runs. sess.run(blocking_enqueue_op) enqueue_thread = self.checkedThread(target=blocking_enqueue) enqueue_thread.start() # The close_op should run after the blocking_enqueue_op has blocked. # TODO(mrry): Figure out how to do this without sleeping. time.sleep(0.1) def close(): sess.run(close_op) close_thread = self.checkedThread(target=close) close_thread.start() # The dequeue will unblock both threads. self.assertEqual(10.0, dequeued_t.eval()) enqueue_thread.join() close_thread.join() for elem in [20.0, 30.0, 40.0, 50.0]: self.assertEqual(elem, dequeued_t.eval()) self.assertEqual(0, q.size().eval()) def testBlockingEnqueueManyBeforeClose(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(4, dtypes_lib.float32, ((),)) elems = [10.0, 20.0, 30.0] enqueue_op = q.enqueue_many((elems,)) blocking_enqueue_op = q.enqueue_many(([50.0, 60.0],)) close_op = q.close() dequeued_t = q.dequeue() enqueue_op.run() def blocking_enqueue(): sess.run(blocking_enqueue_op) enqueue_thread = self.checkedThread(target=blocking_enqueue) enqueue_thread.start() # The close_op should run after the blocking_enqueue_op has blocked. # TODO(mrry): Figure out how to do this without sleeping. time.sleep(0.1) def close(): sess.run(close_op) close_thread = self.checkedThread(target=close) close_thread.start() # The dequeue will unblock both threads. self.assertEqual(10.0, dequeued_t.eval()) enqueue_thread.join() close_thread.join() for elem in [20.0, 30.0, 50.0, 60.0]: self.assertEqual(elem, dequeued_t.eval()) def testDoesNotLoseValue(self): with self.cached_session(): q = data_flow_ops.PaddingFIFOQueue(1, dtypes_lib.float32, ((),)) enqueue_op = q.enqueue((10.0,)) size_t = q.size() enqueue_op.run() for _ in range(500): self.assertEqual(size_t.eval(), [1]) def testSharedQueueSameSession(self): with self.cached_session(): q1 = data_flow_ops.PaddingFIFOQueue( 1, dtypes_lib.float32, ((),), shared_name="shared_queue") q1.enqueue((10.0,)).run() q2 = data_flow_ops.PaddingFIFOQueue( 1, dtypes_lib.float32, ((),), shared_name="shared_queue") q1_size_t = q1.size() q2_size_t = q2.size() self.assertEqual(q1_size_t.eval(), [1]) self.assertEqual(q2_size_t.eval(), [1]) self.assertEqual(q2.dequeue().eval(), [10.0]) self.assertEqual(q1_size_t.eval(), [0]) self.assertEqual(q2_size_t.eval(), [0]) q2.enqueue((20.0,)).run() self.assertEqual(q1_size_t.eval(), [1]) self.assertEqual(q2_size_t.eval(), [1]) self.assertEqual(q1.dequeue().eval(), [20.0]) self.assertEqual(q1_size_t.eval(), [0]) self.assertEqual(q2_size_t.eval(), [0]) def testIncompatibleSharedQueueErrors(self): with self.cached_session(): q_a_1 = data_flow_ops.PaddingFIFOQueue( 10, dtypes_lib.float32, ((),), shared_name="q_a") q_a_2 = data_flow_ops.PaddingFIFOQueue( 15, dtypes_lib.float32, ((),), shared_name="q_a") q_a_1.queue_ref.op.run() with self.assertRaisesOpError("capacity"): q_a_2.queue_ref.op.run() q_b_1 = data_flow_ops.PaddingFIFOQueue( 10, dtypes_lib.float32, ((),), shared_name="q_b") q_b_2 = data_flow_ops.PaddingFIFOQueue( 10, dtypes_lib.int32, ((),), shared_name="q_b") q_b_1.queue_ref.op.run() with self.assertRaisesOpError("component types"): q_b_2.queue_ref.op.run() q_c_1 = data_flow_ops.PaddingFIFOQueue( 10, dtypes_lib.float32, ((),), shared_name="q_c") q_c_2 = data_flow_ops.PaddingFIFOQueue( 10, dtypes_lib.float32, shapes=[(1, 1, 2, 3)], shared_name="q_c") q_c_1.queue_ref.op.run() with self.assertRaisesOpError("component shapes"): q_c_2.queue_ref.op.run() q_d_1 = data_flow_ops.PaddingFIFOQueue( 10, dtypes_lib.float32, shapes=[(1, 1, 2, 3)], shared_name="q_d") q_d_2 = data_flow_ops.PaddingFIFOQueue( 10, dtypes_lib.float32, ((),), shared_name="q_d") q_d_1.queue_ref.op.run() with self.assertRaisesOpError("component shapes"): q_d_2.queue_ref.op.run() q_e_1 = data_flow_ops.PaddingFIFOQueue( 10, dtypes_lib.float32, shapes=[(1, 1, 2, 3)], shared_name="q_e") q_e_2 = data_flow_ops.PaddingFIFOQueue( 10, dtypes_lib.float32, shapes=[(1, 1, 2, 4)], shared_name="q_e") q_e_1.queue_ref.op.run() with self.assertRaisesOpError("component shapes"): q_e_2.queue_ref.op.run() q_f_1 = data_flow_ops.PaddingFIFOQueue( 10, dtypes_lib.float32, ((),), shared_name="q_f") q_f_2 = data_flow_ops.PaddingFIFOQueue( 10, (dtypes_lib.float32, dtypes_lib.int32), ((), ()), shared_name="q_f") q_f_1.queue_ref.op.run() with self.assertRaisesOpError("component types"): q_f_2.queue_ref.op.run() def testSelectQueue(self): with self.cached_session(): num_queues = 10 qlist = list() for _ in xrange(num_queues): qlist.append( data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),))) # Enqueue/Dequeue into a dynamically selected queue for _ in xrange(20): index = np.random.randint(num_queues) q = data_flow_ops.PaddingFIFOQueue.from_list(index, qlist) q.enqueue((10.,)).run() self.assertEqual(q.dequeue().eval(), 10.0) def testSelectQueueOutOfRange(self): with self.cached_session(): q1 = data_flow_ops.PaddingFIFOQueue(10, dtypes_lib.float32, ((),)) q2 = data_flow_ops.PaddingFIFOQueue(15, dtypes_lib.float32, ((),)) enq_q = data_flow_ops.PaddingFIFOQueue.from_list(3, [q1, q2]) with self.assertRaisesOpError("is not in"): enq_q.dequeue().eval() def _blockingDequeue(self, sess, dequeue_op): with self.assertRaisesOpError("was cancelled"): sess.run(dequeue_op) def _blockingDequeueMany(self, sess, dequeue_many_op): with self.assertRaisesOpError("was cancelled"): sess.run(dequeue_many_op) def _blockingEnqueue(self, sess, enqueue_op): with self.assertRaisesOpError("was cancelled"): sess.run(enqueue_op) def _blockingEnqueueMany(self, sess, enqueue_many_op): with self.assertRaisesOpError("was cancelled"): sess.run(enqueue_many_op) def testResetOfBlockingOperation(self): with self.cached_session() as sess: q_empty = data_flow_ops.PaddingFIFOQueue(5, dtypes_lib.float32, ((),)) dequeue_op = q_empty.dequeue() dequeue_many_op = q_empty.dequeue_many(1) q_full = data_flow_ops.PaddingFIFOQueue(5, dtypes_lib.float32, ((),)) sess.run(q_full.enqueue_many(([1.0, 2.0, 3.0, 4.0, 5.0],))) enqueue_op = q_full.enqueue((6.0,)) enqueue_many_op = q_full.enqueue_many(([6.0],)) threads = [ self.checkedThread( self._blockingDequeue, args=(sess, dequeue_op)), self.checkedThread( self._blockingDequeueMany, args=(sess, dequeue_many_op)), self.checkedThread( self._blockingEnqueue, args=(sess, enqueue_op)), self.checkedThread( self._blockingEnqueueMany, args=(sess, enqueue_many_op)) ] for t in threads: t.start() time.sleep(0.1) sess.close() # Will cancel the blocked operations. for t in threads: t.join() def testBigEnqueueMany(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(5, dtypes_lib.int32, ((),)) elem = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] enq = q.enqueue_many((elem,)) deq = q.dequeue() size_op = q.size() enq_done = [] def blocking_enqueue(): enq_done.append(False) # This will fill the queue and then block until enough dequeues happen. sess.run(enq) enq_done.append(True) thread = self.checkedThread(target=blocking_enqueue) thread.start() # The enqueue should start and then block. results = [] results.append(deq.eval()) # Will only complete after the enqueue starts. self.assertEqual(len(enq_done), 1) self.assertEqual(sess.run(size_op), 5) for _ in range(3): results.append(deq.eval()) time.sleep(0.1) self.assertEqual(len(enq_done), 1) self.assertEqual(sess.run(size_op), 5) # This dequeue will unblock the thread. results.append(deq.eval()) time.sleep(0.1) self.assertEqual(len(enq_done), 2) thread.join() for i in range(5): self.assertEqual(size_op.eval(), 5 - i) results.append(deq.eval()) self.assertEqual(size_op.eval(), 5 - i - 1) self.assertAllEqual(elem, results) def testBigDequeueMany(self): with self.cached_session() as sess: q = data_flow_ops.PaddingFIFOQueue(2, dtypes_lib.int32, ((),)) elem = np.arange(4, dtype=np.int32) enq_list = [q.enqueue((e,)) for e in elem] deq = q.dequeue_many(4) results = [] def blocking_dequeue(): # Will only complete after 4 enqueues complete. results.extend(sess.run(deq)) thread = self.checkedThread(target=blocking_dequeue) thread.start() # The dequeue should start and then block. for enq in enq_list: # TODO(mrry): Figure out how to do this without sleeping. time.sleep(0.1) self.assertEqual(len(results), 0) sess.run(enq) # Enough enqueued to unblock the dequeue thread.join() self.assertAllEqual(elem, results) def testDtypes(self): with self.cached_session() as sess: dtypes = [ dtypes_lib.float32, dtypes_lib.float64, dtypes_lib.int32, dtypes_lib.uint8, dtypes_lib.int16, dtypes_lib.int8, dtypes_lib.int64, dtypes_lib.bool, dtypes_lib.complex64, dtypes_lib.complex128 ] shape = (32, 4, 128) q = data_flow_ops.PaddingFIFOQueue(32, dtypes, [shape[1:]] * len(dtypes)) input_tuple = [] for dtype in dtypes: np_dtype = dtype.as_numpy_dtype np_array = np.random.randint(-10, 10, shape) if dtype == dtypes_lib.bool: np_array = np_array > 0 elif dtype in (dtypes_lib.complex64, dtypes_lib.complex128): np_array = np.sqrt(np_array.astype(np_dtype)) else: np_array = np_array.astype(np_dtype) input_tuple.append(np_array) q.enqueue_many(input_tuple).run() output_tuple_t = q.dequeue_many(32) output_tuple = sess.run(output_tuple_t) for (input_elem, output_elem) in zip(input_tuple, output_tuple): self.assertAllEqual(input_elem, output_elem) def testUnknownRank(self): with self.assertRaisesRegexp(ValueError, "must have a defined rank"): data_flow_ops.PaddingFIFOQueue(32, [dtypes_lib.float32], [tensor_shape.TensorShape(None)]) class QueueFromListTest(test.TestCase): def testQueueFromListShapes(self): which = constant_op.constant(1) def _cmp(expected, *shapes): qs = [ data_flow_ops.PaddingFIFOQueue(10, [dtypes_lib.float32], [tensor_shape.TensorShape(s)]) for s in shapes ] s_expected = tensor_shape.TensorShape(expected) s = data_flow_ops.QueueBase.from_list(which, qs).shapes[0] if s_expected.ndims is None: self.assertEqual(s_expected.ndims, s.ndims) else: self.assertEqual(s_expected.as_list(), s.as_list()) _cmp(None, [1, None], [None]) _cmp([None], [1], [2]) _cmp([1, None], [1, 1], [1, 2]) _cmp([1, None], [1, 1], [1, None]) _cmp([None, None], [None, 1], [1, None]) _cmp([1], [1], [1], [1]) _cmp([None], [1], [None], [1]) _cmp(None, [1, None], [1], [1]) def testQueueFromListShapesMultipleComponents(self): q_u_u = data_flow_ops.PaddingFIFOQueue( 10, [dtypes_lib.float32, dtypes_lib.int32], [tensor_shape.TensorShape([None]), tensor_shape.TensorShape([None])]) q_u_f = data_flow_ops.PaddingFIFOQueue( 10, [dtypes_lib.float32, dtypes_lib.int32], [tensor_shape.TensorShape([None]), tensor_shape.TensorShape([1, 2])]) q_f_f = data_flow_ops.PaddingFIFOQueue( 10, [dtypes_lib.float32, dtypes_lib.int32], [tensor_shape.TensorShape([3, 4]), tensor_shape.TensorShape([1, 2])]) which = constant_op.constant(1) s_cmp_1 = data_flow_ops.QueueBase.from_list(which, [q_u_u, q_u_u, q_u_u]).shapes self.assertEqual([1, 1], [x.ndims for x in s_cmp_1]) self.assertEqual([None, None], [x.as_list()[0] for x in s_cmp_1]) s_cmp_2 = data_flow_ops.QueueBase.from_list(which, [q_u_u, q_u_u, q_u_f]).shapes self.assertEqual([1, None], [x.ndims for x in s_cmp_2]) self.assertEqual([None], s_cmp_2[0].as_list()) s_cmp_3 = data_flow_ops.QueueBase.from_list(which, [q_f_f, q_f_f]).shapes self.assertEqual([2, 2], [x.ndims for x in s_cmp_3]) self.assertEqual([[3, 4], [1, 2]], [x.as_list() for x in s_cmp_3]) if __name__ == "__main__": test.main()