diff options
Diffstat (limited to 'tensorflow/python/kernel_tests/fifo_queue_test.py')
-rw-r--r-- | tensorflow/python/kernel_tests/fifo_queue_test.py | 1043 |
1 files changed, 1043 insertions, 0 deletions
diff --git a/tensorflow/python/kernel_tests/fifo_queue_test.py b/tensorflow/python/kernel_tests/fifo_queue_test.py new file mode 100644 index 0000000000..57448db433 --- /dev/null +++ b/tensorflow/python/kernel_tests/fifo_queue_test.py @@ -0,0 +1,1043 @@ +"""Tests for tensorflow.ops.data_flow_ops.FIFOQueue.""" +import random +import re +import time + +import tensorflow.python.platform + +import numpy as np +import tensorflow as tf + + +class FIFOQueueTest(tf.test.TestCase): + + def testConstructor(self): + with tf.Graph().as_default(): + q = tf.FIFOQueue(10, tf.float32, name="Q") + self.assertTrue(isinstance(q.queue_ref, tf.Tensor)) + self.assertEquals(tf.string_ref, q.queue_ref.dtype) + self.assertProtoEquals(""" + name:'Q' op:'FIFOQueue' + attr { key: 'component_types' value { list { type: DT_FLOAT } } } + attr { key: 'shapes' value { list {} } } + attr { key: 'capacity' value { i: 10 } } + attr { key: 'container' value { s: '' } } + attr { key: 'shared_name' value { s: '' } } + """, q.queue_ref.op.node_def) + + def testMultiQueueConstructor(self): + with tf.Graph().as_default(): + q = tf.FIFOQueue(5, (tf.int32, tf.float32), + shared_name="foo", name="Q") + self.assertTrue(isinstance(q.queue_ref, tf.Tensor)) + self.assertEquals(tf.string_ref, q.queue_ref.dtype) + self.assertProtoEquals(""" + name:'Q' op:'FIFOQueue' + attr { key: 'component_types' value { list { + type: DT_INT32 type : DT_FLOAT + } } } + attr { key: 'shapes' value { list {} } } + attr { key: 'capacity' value { i: 5 } } + attr { key: 'container' value { s: '' } } + attr { key: 'shared_name' value { s: 'foo' } } + """, q.queue_ref.op.node_def) + + def testConstructorWithShapes(self): + with tf.Graph().as_default(): + q = tf.FIFOQueue(5, (tf.int32, tf.float32), + shapes=(tf.TensorShape([1, 1, 2, 3]), + tf.TensorShape([5, 8])), name="Q") + self.assertTrue(isinstance(q.queue_ref, tf.Tensor)) + self.assertEquals(tf.string_ref, q.queue_ref.dtype) + self.assertProtoEquals(""" + name:'Q' op:'FIFOQueue' + attr { key: 'component_types' value { list { + type: DT_INT32 type : DT_FLOAT + } } } + attr { key: 'shapes' value { list { + shape { dim { size: 1 } + dim { size: 1 } + dim { size: 2 } + dim { size: 3 } } + shape { dim { size: 5 } + dim { size: 8 } } + } } } + attr { key: 'capacity' value { i: 5 } } + attr { key: 'container' value { s: '' } } + attr { key: 'shared_name' value { s: '' } } + """, q.queue_ref.op.node_def) + + def testEnqueue(self): + with self.test_session(): + q = tf.FIFOQueue(10, tf.float32) + enqueue_op = q.enqueue((10.0,)) + enqueue_op.run() + + def testEnqueueWithShape(self): + with self.test_session(): + q = tf.FIFOQueue(10, tf.float32, shapes=(3, 2)) + enqueue_correct_op = q.enqueue(([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]],)) + enqueue_correct_op.run() + with self.assertRaises(ValueError): + q.enqueue(([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]],)) + self.assertEqual(1, q.size().eval()) + + def testEnqueueManyWithShape(self): + with self.test_session(): + q = tf.FIFOQueue(10, [tf.int32, tf.int32], + shapes=[(), (2,)]) + q.enqueue_many([[1, 2, 3, 4], [[1, 1], [2, 2], [3, 3], [4, 4]]]).run() + self.assertEqual(4, q.size().eval()) + + def testParallelEnqueue(self): + with self.test_session() as sess: + q = tf.FIFOQueue(10, tf.float32) + elems = [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] + enqueue_ops = [q.enqueue((x,)) for x in elems] + dequeued_t = q.dequeue() + + # Run one producer thread for each element in elems. + def enqueue(enqueue_op): + sess.run(enqueue_op) + threads = [self.checkedThread(target=enqueue, args=(e,)) + for e in enqueue_ops] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + # Dequeue every element using a single thread. + results = [] + for _ in xrange(len(elems)): + results.append(dequeued_t.eval()) + self.assertItemsEqual(elems, results) + + def testParallelDequeue(self): + with self.test_session() as sess: + q = tf.FIFOQueue(10, tf.float32) + elems = [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] + enqueue_ops = [q.enqueue((x,)) for x in elems] + dequeued_t = q.dequeue() + + # Enqueue every element using a single thread. + for enqueue_op in enqueue_ops: + enqueue_op.run() + + # Run one consumer thread for each element in elems. + results = [] + + def dequeue(): + results.append(sess.run(dequeued_t)) + threads = [self.checkedThread(target=dequeue) for _ in enqueue_ops] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + self.assertItemsEqual(elems, results) + + def testDequeue(self): + with self.test_session(): + q = tf.FIFOQueue(10, tf.float32) + elems = [10.0, 20.0, 30.0] + enqueue_ops = [q.enqueue((x,)) for x in elems] + dequeued_t = q.dequeue() + + for enqueue_op in enqueue_ops: + enqueue_op.run() + + for i in xrange(len(elems)): + vals = dequeued_t.eval() + self.assertEqual([elems[i]], vals) + + def testEnqueueAndBlockingDequeue(self): + with self.test_session() as sess: + q = tf.FIFOQueue(3, tf.float32) + elems = [10.0, 20.0, 30.0] + enqueue_ops = [q.enqueue((x,)) for x in elems] + dequeued_t = q.dequeue() + + def enqueue(): + # The enqueue_ops should run after the dequeue op has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + for enqueue_op in enqueue_ops: + sess.run(enqueue_op) + + results = [] + + def dequeue(): + for _ in xrange(len(elems)): + results.append(sess.run(dequeued_t)) + + enqueue_thread = self.checkedThread(target=enqueue) + dequeue_thread = self.checkedThread(target=dequeue) + enqueue_thread.start() + dequeue_thread.start() + enqueue_thread.join() + dequeue_thread.join() + + for elem, result in zip(elems, results): + self.assertEqual([elem], result) + + def testMultiEnqueueAndDequeue(self): + with self.test_session() as sess: + q = tf.FIFOQueue(10, (tf.int32, tf.float32)) + elems = [(5, 10.0), (10, 20.0), (15, 30.0)] + enqueue_ops = [q.enqueue((x, y)) for x, y in elems] + dequeued_t = q.dequeue() + + for enqueue_op in enqueue_ops: + enqueue_op.run() + + for i in xrange(len(elems)): + x_val, y_val = sess.run(dequeued_t) + x, y = elems[i] + self.assertEqual([x], x_val) + self.assertEqual([y], y_val) + + def testQueueSizeEmpty(self): + with self.test_session(): + q = tf.FIFOQueue(10, tf.float32) + self.assertEqual([0], q.size().eval()) + + def testQueueSizeAfterEnqueueAndDequeue(self): + with self.test_session(): + q = tf.FIFOQueue(10, tf.float32) + enqueue_op = q.enqueue((10.0,)) + dequeued_t = q.dequeue() + size = q.size() + self.assertEqual([], size.get_shape()) + + enqueue_op.run() + self.assertEqual(1, size.eval()) + dequeued_t.op.run() + self.assertEqual(0, size.eval()) + + def testEnqueueMany(self): + with self.test_session(): + q = tf.FIFOQueue(10, tf.float32) + elems = [10.0, 20.0, 30.0, 40.0] + enqueue_op = q.enqueue_many((elems,)) + dequeued_t = q.dequeue() + enqueue_op.run() + enqueue_op.run() + + for i in range(8): + vals = dequeued_t.eval() + self.assertEqual([elems[i % 4]], vals) + + def testEmptyEnqueueMany(self): + with self.test_session(): + q = tf.FIFOQueue(10, tf.float32) + empty_t = tf.constant([], dtype=tf.float32, + shape=[0, 2, 3]) + enqueue_op = q.enqueue_many((empty_t,)) + size_t = q.size() + + self.assertEqual([0], size_t.eval()) + enqueue_op.run() + self.assertEqual([0], size_t.eval()) + + def testEmptyDequeueMany(self): + with self.test_session(): + q = tf.FIFOQueue(10, tf.float32, shapes=()) + enqueue_op = q.enqueue((10.0,)) + dequeued_t = q.dequeue_many(0) + + self.assertEqual([], dequeued_t.eval().tolist()) + enqueue_op.run() + self.assertEqual([], dequeued_t.eval().tolist()) + + def testEmptyDequeueManyWithNoShape(self): + with self.test_session(): + q = tf.FIFOQueue(10, tf.float32) + # Expect the operation to fail due to the shape not being constrained. + with self.assertRaisesOpError("specified shapes"): + q.dequeue_many(0).eval() + + def testMultiEnqueueMany(self): + with self.test_session() as sess: + q = tf.FIFOQueue(10, (tf.float32, tf.int32)) + float_elems = [10.0, 20.0, 30.0, 40.0] + int_elems = [[1, 2], [3, 4], [5, 6], [7, 8]] + enqueue_op = q.enqueue_many((float_elems, int_elems)) + dequeued_t = q.dequeue() + + enqueue_op.run() + enqueue_op.run() + + for i in range(8): + float_val, int_val = sess.run(dequeued_t) + self.assertEqual(float_elems[i % 4], float_val) + self.assertAllEqual(int_elems[i % 4], int_val) + + def testDequeueMany(self): + with self.test_session(): + q = tf.FIFOQueue(10, tf.float32, ()) + elems = [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] + enqueue_op = q.enqueue_many((elems,)) + dequeued_t = q.dequeue_many(4) + + enqueue_op.run() + + self.assertAllEqual(elems[0:4], dequeued_t.eval()) + self.assertAllEqual(elems[4:8], dequeued_t.eval()) + + def testMultiDequeueMany(self): + with self.test_session() as sess: + q = tf.FIFOQueue(10, (tf.float32, tf.int32), + shapes=((), (2,))) + float_elems = [ + 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] + int_elems = [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10], + [11, 12], [13, 14], [15, 16], [17, 18], [19, 20]] + enqueue_op = q.enqueue_many((float_elems, int_elems)) + dequeued_t = q.dequeue_many(4) + dequeued_single_t = q.dequeue() + + enqueue_op.run() + + float_val, int_val = sess.run(dequeued_t) + self.assertAllEqual(float_elems[0:4], float_val) + self.assertAllEqual(int_elems[0:4], int_val) + self.assertEqual(float_val.shape, dequeued_t[0].get_shape()) + self.assertEqual(int_val.shape, dequeued_t[1].get_shape()) + + float_val, int_val = sess.run(dequeued_t) + self.assertAllEqual(float_elems[4:8], float_val) + self.assertAllEqual(int_elems[4:8], int_val) + + float_val, int_val = sess.run(dequeued_single_t) + self.assertAllEqual(float_elems[8], float_val) + self.assertAllEqual(int_elems[8], int_val) + self.assertEqual(float_val.shape, dequeued_single_t[0].get_shape()) + self.assertEqual(int_val.shape, dequeued_single_t[1].get_shape()) + + def testHighDimension(self): + with self.test_session(): + q = tf.FIFOQueue(10, tf.int32, (4, 4, 4, 4)) + elems = np.array([[[[[x] * 4] * 4] * 4] * 4 for x in range(10)], np.int32) + enqueue_op = q.enqueue_many((elems,)) + dequeued_t = q.dequeue_many(10) + + enqueue_op.run() + self.assertAllEqual(dequeued_t.eval(), elems) + + def testParallelEnqueueMany(self): + with self.test_session() as sess: + q = tf.FIFOQueue(1000, tf.float32, shapes=()) + elems = [10.0 * x for x in range(100)] + enqueue_op = q.enqueue_many((elems,)) + dequeued_t = q.dequeue_many(1000) + + # Enqueue 100 items in parallel on 10 threads. + def enqueue(): + sess.run(enqueue_op) + threads = [self.checkedThread(target=enqueue) for _ in range(10)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + self.assertItemsEqual(dequeued_t.eval(), elems * 10) + + def testParallelDequeueMany(self): + with self.test_session() as sess: + q = tf.FIFOQueue(1000, tf.float32, shapes=()) + elems = [10.0 * x for x in range(1000)] + enqueue_op = q.enqueue_many((elems,)) + dequeued_t = q.dequeue_many(100) + + enqueue_op.run() + + # Dequeue 100 items in parallel on 10 threads. + dequeued_elems = [] + + def dequeue(): + dequeued_elems.extend(sess.run(dequeued_t)) + threads = [self.checkedThread(target=dequeue) for _ in range(10)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + self.assertItemsEqual(elems, dequeued_elems) + + def testParallelEnqueueAndDequeue(self): + with self.test_session() as sess: + q = tf.FIFOQueue(50, tf.float32, shapes=()) + initial_elements = [10.0] * 49 + q.enqueue_many((initial_elements,)).run() + + enqueue_op = q.enqueue((20.0,)) + dequeued_t = q.dequeue() + + def enqueue(): + for _ in xrange(100): + sess.run(enqueue_op) + def dequeue(): + for _ in xrange(100): + self.assertTrue(sess.run(dequeued_t) in (10.0, 20.0)) + + enqueue_threads = [self.checkedThread(target=enqueue) for _ in range(10)] + dequeue_threads = [self.checkedThread(target=dequeue) for _ in range(10)] + for enqueue_thread in enqueue_threads: + enqueue_thread.start() + for dequeue_thread in dequeue_threads: + dequeue_thread.start() + for enqueue_thread in enqueue_threads: + enqueue_thread.join() + for dequeue_thread in dequeue_threads: + dequeue_thread.join() + + # Dequeue the initial count of elements to clean up. + cleanup_elems = q.dequeue_many(49).eval() + for elem in cleanup_elems: + self.assertTrue(elem in (10.0, 20.0)) + + def testMixtureOfEnqueueAndEnqueueMany(self): + with self.test_session() as sess: + q = tf.FIFOQueue(10, tf.int32, shapes=()) + enqueue_placeholder = tf.placeholder(tf.int32, shape=()) + enqueue_op = q.enqueue((enqueue_placeholder,)) + enqueuemany_placeholder = tf.placeholder( + tf.int32, shape=(None,)) + enqueuemany_op = q.enqueue_many((enqueuemany_placeholder,)) + + dequeued_t = q.dequeue() + close_op = q.close() + + def dequeue(): + for i in xrange(250): + self.assertEqual(i, sess.run(dequeued_t)) + dequeue_thread = self.checkedThread(target=dequeue) + dequeue_thread.start() + + elements_enqueued = 0 + while elements_enqueued < 250: + # With equal probability, run Enqueue or enqueue_many. + if random.random() > 0.5: + enqueue_op.run({enqueue_placeholder: elements_enqueued}) + elements_enqueued += 1 + else: + count = random.randint(0, min(20, 250 - elements_enqueued)) + range_to_enqueue = range(elements_enqueued, elements_enqueued + count) + enqueuemany_op.run({enqueuemany_placeholder: range_to_enqueue}) + elements_enqueued += count + + close_op.run() + dequeue_thread.join() + self.assertEqual(0, q.size().eval()) + + def testMixtureOfDequeueAndDequeueMany(self): + with self.test_session() as sess: + q = tf.FIFOQueue(10, tf.int32, shapes=()) + enqueue_op = q.enqueue_many((range(250),)) + dequeued_t = q.dequeue() + count_placeholder = tf.placeholder(tf.int32, shape=()) + dequeuemany_t = q.dequeue_many(count_placeholder) + + def enqueue(): + sess.run(enqueue_op) + enqueue_thread = self.checkedThread(target=enqueue) + enqueue_thread.start() + + elements_dequeued = 0 + while elements_dequeued < 250: + # With equal probability, run Dequeue or dequeue_many. + if random.random() > 0.5: + self.assertEqual(elements_dequeued, dequeued_t.eval()) + elements_dequeued += 1 + else: + count = random.randint(0, min(20, 250 - elements_dequeued)) + expected_range = range(elements_dequeued, elements_dequeued + count) + self.assertAllEqual( + expected_range, dequeuemany_t.eval({count_placeholder: count})) + elements_dequeued += count + + q.close().run() + enqueue_thread.join() + self.assertEqual(0, q.size().eval()) + + def testBlockingDequeueMany(self): + with self.test_session() as sess: + q = tf.FIFOQueue(10, tf.float32, ()) + elems = [10.0, 20.0, 30.0, 40.0] + enqueue_op = q.enqueue_many((elems,)) + dequeued_t = q.dequeue_many(4) + + dequeued_elems = [] + + def enqueue(): + # The enqueue_op should run after the dequeue op has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + sess.run(enqueue_op) + + def dequeue(): + dequeued_elems.extend(sess.run(dequeued_t).tolist()) + + enqueue_thread = self.checkedThread(target=enqueue) + dequeue_thread = self.checkedThread(target=dequeue) + enqueue_thread.start() + dequeue_thread.start() + enqueue_thread.join() + dequeue_thread.join() + + self.assertAllEqual(elems, dequeued_elems) + + def testDequeueManyWithTensorParameter(self): + with self.test_session(): + # Define a first queue that contains integer counts. + dequeue_counts = [random.randint(1, 10) for _ in range(100)] + count_q = tf.FIFOQueue(100, tf.int32, ()) + enqueue_counts_op = count_q.enqueue_many((dequeue_counts,)) + total_count = sum(dequeue_counts) + + # Define a second queue that contains total_count elements. + elems = [random.randint(0, 100) for _ in range(total_count)] + q = tf.FIFOQueue(total_count, tf.int32, ()) + enqueue_elems_op = q.enqueue_many((elems,)) + + # Define a subgraph that first dequeues a count, then DequeuesMany + # that number of elements. + dequeued_t = q.dequeue_many(count_q.dequeue()) + + enqueue_counts_op.run() + enqueue_elems_op.run() + + dequeued_elems = [] + for _ in dequeue_counts: + dequeued_elems.extend(dequeued_t.eval()) + self.assertEqual(elems, dequeued_elems) + + def testDequeueFromClosedQueue(self): + with self.test_session(): + q = tf.FIFOQueue(10, tf.float32) + elems = [10.0, 20.0, 30.0, 40.0] + enqueue_op = q.enqueue_many((elems,)) + close_op = q.close() + dequeued_t = q.dequeue() + + enqueue_op.run() + close_op.run() + for elem in elems: + self.assertEqual([elem], dequeued_t.eval()) + + # Expect the operation to fail due to the queue being closed. + with self.assertRaisesRegexp(tf.errors.OutOfRangeError, + "is closed and has insufficient"): + dequeued_t.eval() + + def testBlockingDequeueFromClosedQueue(self): + with self.test_session() as sess: + q = tf.FIFOQueue(10, tf.float32) + elems = [10.0, 20.0, 30.0, 40.0] + enqueue_op = q.enqueue_many((elems,)) + close_op = q.close() + dequeued_t = q.dequeue() + + enqueue_op.run() + + def dequeue(): + for elem in elems: + self.assertEqual([elem], sess.run(dequeued_t)) + # Expect the operation to fail due to the queue being closed. + with self.assertRaisesRegexp(tf.errors.OutOfRangeError, + "is closed and has insufficient"): + sess.run(dequeued_t) + + dequeue_thread = self.checkedThread(target=dequeue) + dequeue_thread.start() + # The close_op should run after the dequeue_thread has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + close_op.run() + dequeue_thread.join() + + def testBlockingDequeueFromClosedEmptyQueue(self): + with self.test_session() as sess: + q = tf.FIFOQueue(10, tf.float32) + close_op = q.close() + dequeued_t = q.dequeue() + + def dequeue(): + # Expect the operation to fail due to the queue being closed. + with self.assertRaisesRegexp(tf.errors.OutOfRangeError, + "is closed and has insufficient"): + sess.run(dequeued_t) + + dequeue_thread = self.checkedThread(target=dequeue) + dequeue_thread.start() + # The close_op should run after the dequeue_thread has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + close_op.run() + dequeue_thread.join() + + def testBlockingDequeueManyFromClosedQueue(self): + with self.test_session() as sess: + q = tf.FIFOQueue(10, tf.float32, ()) + elems = [10.0, 20.0, 30.0, 40.0] + enqueue_op = q.enqueue_many((elems,)) + close_op = q.close() + dequeued_t = q.dequeue_many(4) + + enqueue_op.run() + + def dequeue(): + self.assertAllEqual(elems, sess.run(dequeued_t)) + # Expect the operation to fail due to the queue being closed. + with self.assertRaisesRegexp(tf.errors.OutOfRangeError, + "is closed and has insufficient"): + sess.run(dequeued_t) + + dequeue_thread = self.checkedThread(target=dequeue) + dequeue_thread.start() + # The close_op should run after the dequeue_thread has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + close_op.run() + dequeue_thread.join() + + def testEnqueueManyLargerThanCapacityWithConcurrentDequeueMany(self): + with self.test_session() as sess: + q = tf.FIFOQueue(4, tf.float32, ()) + elems = [10.0, 20.0, 30.0, 40.0] + enqueue_op = q.enqueue_many((elems,)) + close_op = q.close() + dequeued_t = q.dequeue_many(3) + cleanup_dequeue_t = q.dequeue() + + def enqueue(): + sess.run(enqueue_op) + + def dequeue(): + self.assertAllEqual(elems[0:3], sess.run(dequeued_t)) + with self.assertRaises(tf.errors.OutOfRangeError): + sess.run(dequeued_t) + self.assertEqual(elems[3], sess.run(cleanup_dequeue_t)) + + def close(): + sess.run(close_op) + + enqueue_thread = self.checkedThread(target=enqueue) + enqueue_thread.start() + + dequeue_thread = self.checkedThread(target=dequeue) + dequeue_thread.start() + # The close_op should run after the dequeue_thread has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + + close_thread = self.checkedThread(target=close) + close_thread.start() + + enqueue_thread.join() + dequeue_thread.join() + close_thread.join() + + def testClosedBlockingDequeueManyRestoresPartialBatch(self): + with self.test_session() as sess: + q = tf.FIFOQueue(4, (tf.float32, tf.float32), ((), ())) + elems_a = [1.0, 2.0, 3.0] + elems_b = [10.0, 20.0, 30.0] + enqueue_op = q.enqueue_many((elems_a, elems_b)) + dequeued_a_t, dequeued_b_t = q.dequeue_many(4) + cleanup_dequeue_a_t, cleanup_dequeue_b_t = q.dequeue() + close_op = q.close() + + enqueue_op.run() + + def dequeue(): + with self.assertRaises(tf.errors.OutOfRangeError): + sess.run([dequeued_a_t, dequeued_b_t]) + + dequeue_thread = self.checkedThread(target=dequeue) + dequeue_thread.start() + # The close_op should run after the dequeue_thread has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + + close_op.run() + dequeue_thread.join() + # Test that the elements in the partially-dequeued batch are + # restored in the correct order. + for elem_a, elem_b in zip(elems_a, elems_b): + val_a, val_b = sess.run([cleanup_dequeue_a_t, cleanup_dequeue_b_t]) + self.assertEqual(elem_a, val_a) + self.assertEqual(elem_b, val_b) + self.assertEqual(0, q.size().eval()) + + def testBlockingDequeueManyFromClosedEmptyQueue(self): + with self.test_session() as sess: + q = tf.FIFOQueue(10, tf.float32, ()) + close_op = q.close() + dequeued_t = q.dequeue_many(4) + + def dequeue(): + # Expect the operation to fail due to the queue being closed. + with self.assertRaisesRegexp(tf.errors.OutOfRangeError, + "is closed and has insufficient"): + sess.run(dequeued_t) + + dequeue_thread = self.checkedThread(target=dequeue) + dequeue_thread.start() + # The close_op should run after the dequeue_thread has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + close_op.run() + dequeue_thread.join() + + def testEnqueueToClosedQueue(self): + with self.test_session(): + q = tf.FIFOQueue(10, tf.float32) + enqueue_op = q.enqueue((10.0,)) + close_op = q.close() + + enqueue_op.run() + close_op.run() + + # Expect the operation to fail due to the queue being closed. + with self.assertRaisesRegexp(tf.errors.AbortedError, "is closed"): + enqueue_op.run() + + def testEnqueueManyToClosedQueue(self): + with self.test_session(): + q = tf.FIFOQueue(10, tf.float32) + elems = [10.0, 20.0, 30.0, 40.0] + enqueue_op = q.enqueue_many((elems,)) + close_op = q.close() + + enqueue_op.run() + close_op.run() + + # Expect the operation to fail due to the queue being closed. + with self.assertRaisesRegexp(tf.errors.AbortedError, "is closed"): + enqueue_op.run() + + def testBlockingEnqueueToFullQueue(self): + with self.test_session() as sess: + q = tf.FIFOQueue(4, tf.float32) + elems = [10.0, 20.0, 30.0, 40.0] + enqueue_op = q.enqueue_many((elems,)) + blocking_enqueue_op = q.enqueue((50.0,)) + dequeued_t = q.dequeue() + + enqueue_op.run() + + def blocking_enqueue(): + sess.run(blocking_enqueue_op) + thread = self.checkedThread(target=blocking_enqueue) + thread.start() + # The dequeue ops should run after the blocking_enqueue_op has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + for elem in elems: + self.assertEqual([elem], dequeued_t.eval()) + self.assertEqual([50.0], dequeued_t.eval()) + thread.join() + + def testBlockingEnqueueManyToFullQueue(self): + with self.test_session() as sess: + q = tf.FIFOQueue(4, tf.float32) + elems = [10.0, 20.0, 30.0, 40.0] + enqueue_op = q.enqueue_many((elems,)) + blocking_enqueue_op = q.enqueue_many(([50.0, 60.0],)) + dequeued_t = q.dequeue() + + enqueue_op.run() + + def blocking_enqueue(): + sess.run(blocking_enqueue_op) + thread = self.checkedThread(target=blocking_enqueue) + thread.start() + # The dequeue ops should run after the blocking_enqueue_op has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + for elem in elems: + self.assertEqual([elem], dequeued_t.eval()) + time.sleep(0.01) + self.assertEqual([50.0], dequeued_t.eval()) + self.assertEqual([60.0], dequeued_t.eval()) + + def testBlockingEnqueueBeforeClose(self): + with self.test_session() as sess: + q = tf.FIFOQueue(4, tf.float32) + elems = [10.0, 20.0, 30.0, 40.0] + enqueue_op = q.enqueue_many((elems,)) + blocking_enqueue_op = q.enqueue((50.0,)) + close_op = q.close() + dequeued_t = q.dequeue() + + enqueue_op.run() + + def blocking_enqueue(): + # Expect the operation to succeed once the dequeue op runs. + sess.run(blocking_enqueue_op) + enqueue_thread = self.checkedThread(target=blocking_enqueue) + enqueue_thread.start() + + # The close_op should run after the blocking_enqueue_op has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + + def close(): + sess.run(close_op) + close_thread = self.checkedThread(target=close) + close_thread.start() + + # The dequeue will unblock both threads. + self.assertEqual(10.0, dequeued_t.eval()) + enqueue_thread.join() + close_thread.join() + + for elem in [20.0, 30.0, 40.0, 50.0]: + self.assertEqual(elem, dequeued_t.eval()) + self.assertEqual(0, q.size().eval()) + + def testBlockingEnqueueManyBeforeClose(self): + with self.test_session() as sess: + q = tf.FIFOQueue(4, tf.float32) + elems = [10.0, 20.0, 30.0] + enqueue_op = q.enqueue_many((elems,)) + blocking_enqueue_op = q.enqueue_many(([50.0, 60.0],)) + close_op = q.close() + dequeued_t = q.dequeue() + enqueue_op.run() + + def blocking_enqueue(): + sess.run(blocking_enqueue_op) + enqueue_thread = self.checkedThread(target=blocking_enqueue) + enqueue_thread.start() + + # The close_op should run after the blocking_enqueue_op has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + + def close(): + sess.run(close_op) + close_thread = self.checkedThread(target=close) + close_thread.start() + + # The dequeue will unblock both threads. + self.assertEqual(10.0, dequeued_t.eval()) + enqueue_thread.join() + close_thread.join() + for elem in [20.0, 30.0, 50.0, 60.0]: + self.assertEqual(elem, dequeued_t.eval()) + + def testDoesNotLoseValue(self): + with self.test_session(): + q = tf.FIFOQueue(1, tf.float32) + enqueue_op = q.enqueue((10.0,)) + size_t = q.size() + + enqueue_op.run() + for _ in range(500): + self.assertEqual(size_t.eval(), [1]) + + def testSharedQueueSameSession(self): + with self.test_session(): + q1 = tf.FIFOQueue( + 1, tf.float32, shared_name="shared_queue") + q1.enqueue((10.0,)).run() + + q2 = tf.FIFOQueue( + 1, tf.float32, shared_name="shared_queue") + + q1_size_t = q1.size() + q2_size_t = q2.size() + + self.assertEqual(q1_size_t.eval(), [1]) + self.assertEqual(q2_size_t.eval(), [1]) + + self.assertEqual(q2.dequeue().eval(), [10.0]) + + self.assertEqual(q1_size_t.eval(), [0]) + self.assertEqual(q2_size_t.eval(), [0]) + + q2.enqueue((20.0,)).run() + + self.assertEqual(q1_size_t.eval(), [1]) + self.assertEqual(q2_size_t.eval(), [1]) + + self.assertEqual(q1.dequeue().eval(), [20.0]) + + self.assertEqual(q1_size_t.eval(), [0]) + self.assertEqual(q2_size_t.eval(), [0]) + + def testIncompatibleSharedQueueErrors(self): + with self.test_session(): + q_a_1 = tf.FIFOQueue(10, tf.float32, shared_name="q_a") + q_a_2 = tf.FIFOQueue(15, tf.float32, shared_name="q_a") + q_a_1.queue_ref.eval() + with self.assertRaisesOpError("capacity"): + q_a_2.queue_ref.eval() + + q_b_1 = tf.FIFOQueue(10, tf.float32, shared_name="q_b") + q_b_2 = tf.FIFOQueue(10, tf.int32, shared_name="q_b") + q_b_1.queue_ref.eval() + with self.assertRaisesOpError("component types"): + q_b_2.queue_ref.eval() + + q_c_1 = tf.FIFOQueue(10, tf.float32, shared_name="q_c") + q_c_2 = tf.FIFOQueue( + 10, tf.float32, shapes=[(1, 1, 2, 3)], shared_name="q_c") + q_c_1.queue_ref.eval() + with self.assertRaisesOpError("component shapes"): + q_c_2.queue_ref.eval() + + q_d_1 = tf.FIFOQueue( + 10, tf.float32, shapes=[(1, 1, 2, 3)], shared_name="q_d") + q_d_2 = tf.FIFOQueue(10, tf.float32, shared_name="q_d") + q_d_1.queue_ref.eval() + with self.assertRaisesOpError("component shapes"): + q_d_2.queue_ref.eval() + + q_e_1 = tf.FIFOQueue( + 10, tf.float32, shapes=[(1, 1, 2, 3)], shared_name="q_e") + q_e_2 = tf.FIFOQueue( + 10, tf.float32, shapes=[(1, 1, 2, 4)], shared_name="q_e") + q_e_1.queue_ref.eval() + with self.assertRaisesOpError("component shapes"): + q_e_2.queue_ref.eval() + + q_f_1 = tf.FIFOQueue(10, tf.float32, shared_name="q_f") + q_f_2 = tf.FIFOQueue( + 10, (tf.float32, tf.int32), shared_name="q_f") + q_f_1.queue_ref.eval() + with self.assertRaisesOpError("component types"): + q_f_2.queue_ref.eval() + + def testSelectQueue(self): + with self.test_session(): + num_queues = 10 + qlist = list() + for _ in xrange(num_queues): + qlist.append(tf.FIFOQueue(10, tf.float32)) + # Enqueue/Dequeue into a dynamically selected queue + for _ in xrange(20): + index = np.random.randint(num_queues) + q = tf.FIFOQueue.from_list(index, qlist) + q.enqueue((10.,)).run() + self.assertEqual(q.dequeue().eval(), 10.0) + + def testSelectQueueOutOfRange(self): + with self.test_session(): + q1 = tf.FIFOQueue(10, tf.float32) + q2 = tf.FIFOQueue(15, tf.float32) + enq_q = tf.FIFOQueue.from_list(3, [q1, q2]) + with self.assertRaisesOpError("Index must be in the range"): + enq_q.dequeue().eval() + + def _blockingDequeue(self, sess, dequeue_op): + with self.assertRaisesOpError("Dequeue operation was cancelled"): + sess.run(dequeue_op) + + def _blockingDequeueMany(self, sess, dequeue_many_op): + with self.assertRaisesOpError("Dequeue operation was cancelled"): + sess.run(dequeue_many_op) + + def _blockingEnqueue(self, sess, enqueue_op): + with self.assertRaisesOpError("Enqueue operation was cancelled"): + sess.run(enqueue_op) + + def _blockingEnqueueMany(self, sess, enqueue_many_op): + with self.assertRaisesOpError("Enqueue operation was cancelled"): + sess.run(enqueue_many_op) + + def testResetOfBlockingOperation(self): + with self.test_session() as sess: + q_empty = tf.FIFOQueue(5, tf.float32, ()) + dequeue_op = q_empty.dequeue() + dequeue_many_op = q_empty.dequeue_many(1) + + q_full = tf.FIFOQueue(5, tf.float32) + sess.run(q_full.enqueue_many(([1.0, 2.0, 3.0, 4.0, 5.0],))) + enqueue_op = q_full.enqueue((6.0,)) + enqueue_many_op = q_full.enqueue_many(([6.0],)) + + threads = [ + self.checkedThread(self._blockingDequeue, args=(sess, dequeue_op)), + self.checkedThread(self._blockingDequeueMany, args=(sess, + dequeue_many_op)), + self.checkedThread(self._blockingEnqueue, args=(sess, enqueue_op)), + self.checkedThread(self._blockingEnqueueMany, args=(sess, + enqueue_many_op))] + for t in threads: + t.start() + time.sleep(0.1) + sess.close() # Will cancel the blocked operations. + for t in threads: + t.join() + + def testBigEnqueueMany(self): + with self.test_session() as sess: + q = tf.FIFOQueue(5, tf.int32, ((),)) + elem = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + enq = q.enqueue_many((elem,)) + deq = q.dequeue() + size_op = q.size() + + enq_done = [] + def blocking_enqueue(): + enq_done.append(False) + # This will fill the queue and then block until enough dequeues happen. + sess.run(enq) + enq_done.append(True) + thread = self.checkedThread(target=blocking_enqueue) + thread.start() + + # The enqueue should start and then block. + results = [] + results.append(deq.eval()) # Will only complete after the enqueue starts. + self.assertEqual(len(enq_done), 1) + self.assertEqual(sess.run(size_op), 5) + + for _ in range(3): + results.append(deq.eval()) + + time.sleep(0.1) + self.assertEqual(len(enq_done), 1) + self.assertEqual(sess.run(size_op), 5) + + # This dequeue will unblock the thread. + results.append(deq.eval()) + time.sleep(0.1) + self.assertEqual(len(enq_done), 2) + thread.join() + + for i in range(5): + self.assertEqual(size_op.eval(), 5 - i) + results.append(deq.eval()) + self.assertEqual(size_op.eval(), 5 - i - 1) + + self.assertAllEqual(elem, results) + + def testBigDequeueMany(self): + with self.test_session() as sess: + q = tf.FIFOQueue(2, tf.int32, ((),)) + elem = range(4) + enq_list = [q.enqueue((e,)) for e in elem] + deq = q.dequeue_many(4) + + results = [] + def blocking_dequeue(): + # Will only complete after 4 enqueues complete. + results.extend(sess.run(deq)) + thread = self.checkedThread(target=blocking_dequeue) + thread.start() + # The dequeue should start and then block. + for enq in enq_list: + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + self.assertEqual(len(results), 0) + sess.run(enq) + + # Enough enqueued to unblock the dequeue + thread.join() + self.assertAllEqual(elem, results) + + +if __name__ == "__main__": + tf.test.main() |