diff options
Diffstat (limited to 'tensorflow/python/kernel_tests/random_shuffle_queue_test.py')
-rw-r--r-- | tensorflow/python/kernel_tests/random_shuffle_queue_test.py | 1054 |
1 files changed, 1054 insertions, 0 deletions
diff --git a/tensorflow/python/kernel_tests/random_shuffle_queue_test.py b/tensorflow/python/kernel_tests/random_shuffle_queue_test.py new file mode 100644 index 0000000000..343ffdcb76 --- /dev/null +++ b/tensorflow/python/kernel_tests/random_shuffle_queue_test.py @@ -0,0 +1,1054 @@ +"""Tests for tensorflow.ops.data_flow_ops.Queue.""" +import random +import re +import time + +import tensorflow.python.platform + +import numpy as np +import tensorflow as tf + + +class RandomShuffleQueueTest(tf.test.TestCase): + + def setUp(self): + # Useful for debugging when a test times out. + super(RandomShuffleQueueTest, self).setUp() + tf.logging.error("Starting: %s", self._testMethodName) + + def tearDown(self): + super(RandomShuffleQueueTest, self).tearDown() + tf.logging.error("Finished: %s", self._testMethodName) + + def testEnqueue(self): + with self.test_session(): + q = tf.RandomShuffleQueue(10, 5, tf.float32) + enqueue_op = q.enqueue((10.0,)) + self.assertAllEqual(0, q.size().eval()) + enqueue_op.run() + self.assertAllEqual(1, q.size().eval()) + + def testEnqueueWithShape(self): + with self.test_session(): + q = tf.RandomShuffleQueue( + 10, 5, tf.float32, shapes=tf.TensorShape([3, 2])) + enqueue_correct_op = q.enqueue(([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]],)) + enqueue_correct_op.run() + self.assertAllEqual(1, q.size().eval()) + with self.assertRaises(ValueError): + q.enqueue(([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]],)) + + def testEnqueueManyWithShape(self): + with self.test_session(): + q = tf.RandomShuffleQueue( + 10, 5, [tf.int32, tf.int32], + shapes=[(), (2,)]) + q.enqueue_many([[1, 2, 3, 4], [[1, 1], [2, 2], [3, 3], [4, 4]]]).run() + self.assertAllEqual(4, q.size().eval()) + + q2 = tf.RandomShuffleQueue(10, 5, tf.int32, shapes=tf.TensorShape([3])) + q2.enqueue(([1, 2, 3],)) + q2.enqueue_many(([[1, 2, 3]],)) + + def testScalarShapes(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue( + 10, 0, [tf.int32, tf.int32], + shapes=[(), (1,)]) + q.enqueue_many([[1, 2, 3, 4], [[5], [6], [7], [8]]]).run() + q.enqueue([9, [10]]).run() + dequeue_t = q.dequeue() + results = [] + for _ in range(2): + a, b = sess.run(dequeue_t) + results.append((a, b)) + a, b = sess.run(q.dequeue_many(3)) + for i in range(3): + results.append((a[i], b[i])) + self.assertItemsEqual([(1, [5]), (2, [6]), (3, [7]), (4, [8]), (9, [10])], + results) + + def testParallelEnqueue(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue(10, 0, tf.float32) + elems = [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] + enqueue_ops = [q.enqueue((x,)) for x in elems] + dequeued_t = q.dequeue() + + # Run one producer thread for each element in elems. + def enqueue(enqueue_op): + sess.run(enqueue_op) + threads = [self.checkedThread(target=enqueue, args=(e,)) + for e in enqueue_ops] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + # Dequeue every element using a single thread. + results = [] + for _ in xrange(len(elems)): + results.append(dequeued_t.eval()) + self.assertItemsEqual(elems, results) + + def testParallelDequeue(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue(10, 0, tf.float32) + elems = [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] + enqueue_ops = [q.enqueue((x,)) for x in elems] + dequeued_t = q.dequeue() + + # Enqueue every element using a single thread. + for enqueue_op in enqueue_ops: + enqueue_op.run() + + # Run one consumer thread for each element in elems. + results = [] + + def dequeue(): + results.append(sess.run(dequeued_t)) + threads = [self.checkedThread(target=dequeue) for _ in enqueue_ops] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + self.assertItemsEqual(elems, results) + + def testDequeue(self): + with self.test_session(): + q = tf.RandomShuffleQueue(10, 0, tf.float32) + elems = [10.0, 20.0, 30.0] + enqueue_ops = [q.enqueue((x,)) for x in elems] + dequeued_t = q.dequeue() + + for enqueue_op in enqueue_ops: + enqueue_op.run() + + vals = [dequeued_t.eval() for _ in xrange(len(elems))] + self.assertItemsEqual(elems, vals) + + def testEnqueueAndBlockingDequeue(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue(3, 0, tf.float32) + elems = [10.0, 20.0, 30.0] + enqueue_ops = [q.enqueue((x,)) for x in elems] + dequeued_t = q.dequeue() + + def enqueue(): + # The enqueue_ops should run after the dequeue op has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + for enqueue_op in enqueue_ops: + sess.run(enqueue_op) + + results = [] + + def dequeue(): + for _ in xrange(len(elems)): + results.append(sess.run(dequeued_t)) + + enqueue_thread = self.checkedThread(target=enqueue) + dequeue_thread = self.checkedThread(target=dequeue) + enqueue_thread.start() + dequeue_thread.start() + enqueue_thread.join() + dequeue_thread.join() + + self.assertItemsEqual(elems, results) + + def testMultiEnqueueAndDequeue(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue( + 10, 0, (tf.int32, tf.float32)) + elems = [(5, 10.0), (10, 20.0), (15, 30.0)] + enqueue_ops = [q.enqueue((x, y)) for x, y in elems] + dequeued_t = q.dequeue() + + for enqueue_op in enqueue_ops: + enqueue_op.run() + + results = [] + for _ in xrange(len(elems)): + x, y = sess.run(dequeued_t) + results.append((x, y)) + self.assertItemsEqual(elems, results) + + def testQueueSizeEmpty(self): + with self.test_session(): + q = tf.RandomShuffleQueue(10, 5, tf.float32) + self.assertEqual(0, q.size().eval()) + + def testQueueSizeAfterEnqueueAndDequeue(self): + with self.test_session(): + q = tf.RandomShuffleQueue(10, 0, tf.float32) + enqueue_op = q.enqueue((10.0,)) + dequeued_t = q.dequeue() + size = q.size() + self.assertEqual([], size.get_shape()) + + enqueue_op.run() + self.assertEqual([1], size.eval()) + dequeued_t.op.run() + self.assertEqual([0], size.eval()) + + def testEnqueueMany(self): + with self.test_session(): + q = tf.RandomShuffleQueue(10, 0, tf.float32) + elems = [10.0, 20.0, 30.0, 40.0] + enqueue_op = q.enqueue_many((elems,)) + dequeued_t = q.dequeue() + enqueue_op.run() + enqueue_op.run() + + results = [] + for _ in range(8): + results.append(dequeued_t.eval()) + self.assertItemsEqual(elems + elems, results) + + def testEmptyEnqueueMany(self): + with self.test_session(): + q = tf.RandomShuffleQueue(10, 5, tf.float32) + empty_t = tf.constant([], dtype=tf.float32, + shape=[0, 2, 3]) + enqueue_op = q.enqueue_many((empty_t,)) + size_t = q.size() + + self.assertEqual(0, size_t.eval()) + enqueue_op.run() + self.assertEqual(0, size_t.eval()) + + def testEmptyDequeueMany(self): + with self.test_session(): + q = tf.RandomShuffleQueue(10, 0, tf.float32, shapes=()) + enqueue_op = q.enqueue((10.0,)) + dequeued_t = q.dequeue_many(0) + + self.assertEqual([], dequeued_t.eval().tolist()) + enqueue_op.run() + self.assertEqual([], dequeued_t.eval().tolist()) + + def testEmptyDequeueManyWithNoShape(self): + with self.test_session(): + q = tf.RandomShuffleQueue(10, 0, tf.float32) + enqueue_op = q.enqueue( + (tf.constant([10.0, 20.0], shape=(1, 2)),)) + dequeued_t = q.dequeue_many(0) + + # Expect the operation to fail due to the shape not being constrained. + with self.assertRaisesOpError( + "requires the components to have specified shapes"): + dequeued_t.eval() + + enqueue_op.run() + + # Unlike tf.Queue, RandomShuffleQueue does not make any + # attempt to support DequeueMany with unspecified shapes, even if + # a shape could be inferred from the elements enqueued. + with self.assertRaisesOpError( + "requires the components to have specified shapes"): + dequeued_t.eval() + + def testMultiEnqueueMany(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue( + 10, 0, (tf.float32, tf.int32)) + float_elems = [10.0, 20.0, 30.0, 40.0] + int_elems = [[1, 2], [3, 4], [5, 6], [7, 8]] + enqueue_op = q.enqueue_many((float_elems, int_elems)) + dequeued_t = q.dequeue() + + enqueue_op.run() + enqueue_op.run() + + results = [] + for _ in range(8): + float_val, int_val = sess.run(dequeued_t) + results.append((float_val, [int_val[0], int_val[1]])) + expected = zip(float_elems, int_elems) + zip(float_elems, int_elems) + self.assertItemsEqual(expected, results) + + def testDequeueMany(self): + with self.test_session(): + q = tf.RandomShuffleQueue(10, 0, tf.float32, ((),)) + elems = [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] + enqueue_op = q.enqueue_many((elems,)) + dequeued_t = q.dequeue_many(5) + + enqueue_op.run() + + results = dequeued_t.eval().tolist() + results.extend(dequeued_t.eval()) + self.assertItemsEqual(elems, results) + + def testMultiDequeueMany(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue( + 10, 0, (tf.float32, tf.int32), + shapes=((), (2,))) + float_elems = [ + 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] + int_elems = [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10], + [11, 12], [13, 14], [15, 16], [17, 18], [19, 20]] + enqueue_op = q.enqueue_many((float_elems, int_elems)) + dequeued_t = q.dequeue_many(4) + dequeued_single_t = q.dequeue() + + enqueue_op.run() + + results = [] + float_val, int_val = sess.run(dequeued_t) + self.assertEqual(float_val.shape, dequeued_t[0].get_shape()) + self.assertEqual(int_val.shape, dequeued_t[1].get_shape()) + results.extend(zip(float_val, int_val.tolist())) + + float_val, int_val = sess.run(dequeued_t) + results.extend(zip(float_val, int_val.tolist())) + + float_val, int_val = sess.run(dequeued_single_t) + self.assertEqual(float_val.shape, dequeued_single_t[0].get_shape()) + self.assertEqual(int_val.shape, dequeued_single_t[1].get_shape()) + results.append((float_val, int_val.tolist())) + + float_val, int_val = sess.run(dequeued_single_t) + results.append((float_val, int_val.tolist())) + + self.assertItemsEqual(zip(float_elems, int_elems), results) + + def testHighDimension(self): + with self.test_session(): + q = tf.RandomShuffleQueue( + 10, 0, tf.int32, ((4, 4, 4, 4))) + elems = np.array([[[[[x] * 4] * 4] * 4] * 4 for x in range(10)], np.int32) + enqueue_op = q.enqueue_many((elems,)) + dequeued_t = q.dequeue_many(10) + + enqueue_op.run() + self.assertItemsEqual(dequeued_t.eval().tolist(), elems.tolist()) + + def testParallelEnqueueMany(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue(1000, 0, tf.float32, shapes=()) + elems = [10.0 * x for x in range(100)] + enqueue_op = q.enqueue_many((elems,)) + dequeued_t = q.dequeue_many(1000) + + # Enqueue 100 items in parallel on 10 threads. + def enqueue(): + sess.run(enqueue_op) + threads = [self.checkedThread(target=enqueue) for _ in range(10)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + self.assertItemsEqual(dequeued_t.eval(), elems * 10) + + def testParallelDequeueMany(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue(1000, 0, tf.float32, shapes=()) + elems = [10.0 * x for x in range(1000)] + enqueue_op = q.enqueue_many((elems,)) + dequeued_t = q.dequeue_many(100) + + enqueue_op.run() + + # Dequeue 100 items in parallel on 10 threads. + dequeued_elems = [] + + def dequeue(): + dequeued_elems.extend(sess.run(dequeued_t)) + threads = [self.checkedThread(target=dequeue) for _ in range(10)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + self.assertItemsEqual(elems, dequeued_elems) + + def testBlockingDequeueMany(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue(10, 0, tf.float32, ((),)) + elems = [10.0, 20.0, 30.0, 40.0] + enqueue_op = q.enqueue_many((elems,)) + dequeued_t = q.dequeue_many(4) + + dequeued_elems = [] + + def enqueue(): + # The enqueue_op should run after the dequeue op has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + sess.run(enqueue_op) + + def dequeue(): + dequeued_elems.extend(sess.run(dequeued_t).tolist()) + + enqueue_thread = self.checkedThread(target=enqueue) + dequeue_thread = self.checkedThread(target=dequeue) + enqueue_thread.start() + dequeue_thread.start() + enqueue_thread.join() + dequeue_thread.join() + + self.assertItemsEqual(elems, dequeued_elems) + + def testDequeueManyWithTensorParameter(self): + with self.test_session(): + # Define a first queue that contains integer counts. + dequeue_counts = [random.randint(1, 10) for _ in range(100)] + count_q = tf.RandomShuffleQueue(100, 0, tf.int32) + enqueue_counts_op = count_q.enqueue_many((dequeue_counts,)) + total_count = sum(dequeue_counts) + + # Define a second queue that contains total_count elements. + elems = [random.randint(0, 100) for _ in range(total_count)] + q = tf.RandomShuffleQueue( + total_count, 0, tf.int32, ((),)) + enqueue_elems_op = q.enqueue_many((elems,)) + + # Define a subgraph that first dequeues a count, then DequeuesMany + # that number of elements. + dequeued_t = q.dequeue_many(count_q.dequeue()) + + enqueue_counts_op.run() + enqueue_elems_op.run() + + dequeued_elems = [] + for _ in dequeue_counts: + dequeued_elems.extend(dequeued_t.eval()) + self.assertItemsEqual(elems, dequeued_elems) + + def testDequeueFromClosedQueue(self): + with self.test_session(): + q = tf.RandomShuffleQueue(10, 2, tf.float32) + elems = [10.0, 20.0, 30.0, 40.0] + enqueue_op = q.enqueue_many((elems,)) + close_op = q.close() + dequeued_t = q.dequeue() + + enqueue_op.run() + close_op.run() + results = [dequeued_t.eval() for _ in elems] + expected = [[elem] for elem in elems] + self.assertItemsEqual(expected, results) + + # Expect the operation to fail due to the queue being closed. + with self.assertRaisesRegexp(tf.errors.OutOfRangeError, + "is closed and has insufficient"): + dequeued_t.eval() + + def testBlockingDequeueFromClosedQueue(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue(10, 2, tf.float32) + elems = [10.0, 20.0, 30.0, 40.0] + enqueue_op = q.enqueue_many((elems,)) + close_op = q.close() + dequeued_t = q.dequeue() + + enqueue_op.run() + + results = [] + def dequeue(): + for _ in elems: + results.append(sess.run(dequeued_t)) + self.assertItemsEqual(elems, results) + # Expect the operation to fail due to the queue being closed. + with self.assertRaisesRegexp(tf.errors.OutOfRangeError, + "is closed and has insufficient"): + sess.run(dequeued_t) + + dequeue_thread = self.checkedThread(target=dequeue) + dequeue_thread.start() + # The close_op should run after the dequeue_thread has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + # The dequeue thread blocked when it hit the min_size requirement. + self.assertEqual(len(results), 2) + close_op.run() + dequeue_thread.join() + # Once the queue is closed, the min_size requirement is lifted. + self.assertEqual(len(results), 4) + + def testBlockingDequeueFromClosedEmptyQueue(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue(10, 0, tf.float32) + close_op = q.close() + dequeued_t = q.dequeue() + + finished = [] # Needs to be a mutable type + def dequeue(): + # Expect the operation to fail due to the queue being closed. + with self.assertRaisesRegexp(tf.errors.OutOfRangeError, + "is closed and has insufficient"): + sess.run(dequeued_t) + finished.append(True) + + dequeue_thread = self.checkedThread(target=dequeue) + dequeue_thread.start() + # The close_op should run after the dequeue_thread has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + self.assertEqual(len(finished), 0) + close_op.run() + dequeue_thread.join() + self.assertEqual(len(finished), 1) + + def testBlockingDequeueManyFromClosedQueue(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue(10, 0, tf.float32, ((),)) + elems = [10.0, 20.0, 30.0, 40.0] + enqueue_op = q.enqueue_many((elems,)) + close_op = q.close() + dequeued_t = q.dequeue_many(4) + + enqueue_op.run() + + progress = [] # Must be mutable + def dequeue(): + self.assertItemsEqual(elems, sess.run(dequeued_t)) + progress.append(1) + # Expect the operation to fail due to the queue being closed. + with self.assertRaisesRegexp(tf.errors.OutOfRangeError, + "is closed and has insufficient"): + sess.run(dequeued_t) + progress.append(2) + + self.assertEqual(len(progress), 0) + dequeue_thread = self.checkedThread(target=dequeue) + dequeue_thread.start() + # The close_op should run after the dequeue_thread has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + for _ in range(100): + time.sleep(0.01) + if len(progress) == 1: break + self.assertEqual(len(progress), 1) + time.sleep(0.01) + close_op.run() + dequeue_thread.join() + self.assertEqual(len(progress), 2) + + def testBlockingDequeueManyFromClosedQueueWithElementsRemaining(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue(10, 0, tf.float32, ((),)) + elems = [10.0, 20.0, 30.0, 40.0] + enqueue_op = q.enqueue_many((elems,)) + close_op = q.close() + dequeued_t = q.dequeue_many(3) + cleanup_dequeue_t = q.dequeue_many(q.size()) + + enqueue_op.run() + + results = [] + def dequeue(): + results.extend(sess.run(dequeued_t)) + # Expect the operation to fail due to the queue being closed. + with self.assertRaisesRegexp(tf.errors.OutOfRangeError, + "is closed and has insufficient"): + sess.run(dequeued_t) + # However, the last result was dequeued before the queue was closed, + # so nothing more is added to results. + results.extend(sess.run(cleanup_dequeue_t)) + + dequeue_thread = self.checkedThread(target=dequeue) + dequeue_thread.start() + # The close_op should run after the dequeue_thread has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + self.assertEqual(len(results), 3) + close_op.run() + dequeue_thread.join() + self.assertEqual(len(results), 3) + + def testBlockingDequeueManyFromClosedEmptyQueue(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue(10, 5, tf.float32, ((),)) + close_op = q.close() + dequeued_t = q.dequeue_many(4) + + def dequeue(): + # Expect the operation to fail due to the queue being closed. + with self.assertRaisesRegexp(tf.errors.OutOfRangeError, + "is closed and has insufficient"): + sess.run(dequeued_t) + + dequeue_thread = self.checkedThread(target=dequeue) + dequeue_thread.start() + # The close_op should run after the dequeue_thread has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + close_op.run() + dequeue_thread.join() + + def testEnqueueToClosedQueue(self): + with self.test_session(): + q = tf.RandomShuffleQueue(10, 4, tf.float32) + enqueue_op = q.enqueue((10.0,)) + close_op = q.close() + + enqueue_op.run() + close_op.run() + + # Expect the operation to fail due to the queue being closed. + with self.assertRaisesRegexp(tf.errors.AbortedError, "is closed"): + enqueue_op.run() + + def testEnqueueManyToClosedQueue(self): + with self.test_session(): + q = tf.RandomShuffleQueue(10, 5, tf.float32, ((),)) + elems = [10.0, 20.0, 30.0, 40.0] + enqueue_op = q.enqueue_many((elems,)) + close_op = q.close() + + enqueue_op.run() + close_op.run() + + # Expect the operation to fail due to the queue being closed. + with self.assertRaisesRegexp(tf.errors.AbortedError, "is closed"): + enqueue_op.run() + + def testBlockingEnqueueToFullQueue(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue(4, 0, tf.float32, ((),)) + elems = [10.0, 20.0, 30.0, 40.0] + enqueue_op = q.enqueue_many((elems,)) + blocking_enqueue_op = q.enqueue((50.0,)) + dequeued_t = q.dequeue() + + enqueue_op.run() + + def blocking_enqueue(): + sess.run(blocking_enqueue_op) + thread = self.checkedThread(target=blocking_enqueue) + thread.start() + # The dequeue ops should run after the blocking_enqueue_op has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + results = [] + for _ in elems: + results.append(dequeued_t.eval()) + results.append(dequeued_t.eval()) + self.assertItemsEqual(elems + [50.0], results) + # There wasn't room for 50.0 in the queue when the first element was + # dequeued. + self.assertNotEqual(50.0, results[0]) + thread.join() + + def testBlockingEnqueueManyToFullQueue(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue(4, 0, tf.float32, ((),)) + elems = [10.0, 20.0, 30.0, 40.0] + enqueue_op = q.enqueue_many((elems,)) + blocking_enqueue_op = q.enqueue_many(([50.0, 60.0],)) + dequeued_t = q.dequeue() + + enqueue_op.run() + + def blocking_enqueue(): + sess.run(blocking_enqueue_op) + thread = self.checkedThread(target=blocking_enqueue) + thread.start() + # The dequeue ops should run after the blocking_enqueue_op has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + + results = [] + for _ in elems: + time.sleep(0.01) + results.append(dequeued_t.eval()) + results.append(dequeued_t.eval()) + results.append(dequeued_t.eval()) + self.assertItemsEqual(elems + [50.0, 60.0], results) + # There wasn't room for 50.0 or 60.0 in the queue when the first + # element was dequeued. + self.assertNotEqual(50.0, results[0]) + self.assertNotEqual(60.0, results[0]) + # Similarly for 60.0 and the second element. + self.assertNotEqual(60.0, results[1]) + + def testBlockingEnqueueToClosedQueue(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue(4, 0, tf.float32, ((),)) + elems = [10.0, 20.0, 30.0, 40.0] + enqueue_op = q.enqueue_many((elems,)) + blocking_enqueue_op = q.enqueue((50.0,)) + dequeued_t = q.dequeue() + close_op = q.close() + + enqueue_op.run() + + def blocking_enqueue(): + # Expect the operation to succeed since it will complete + # before the queue is closed. + sess.run(blocking_enqueue_op) + + # Expect the operation to fail due to the queue being closed. + with self.assertRaisesRegexp(tf.errors.AbortedError, "closed"): + sess.run(blocking_enqueue_op) + thread1 = self.checkedThread(target=blocking_enqueue) + thread1.start() + + # The close_op should run after the first blocking_enqueue_op has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + + def blocking_close(): + sess.run(close_op) + thread2 = self.checkedThread(target=blocking_close) + thread2.start() + + # Wait for the close op to block before unblocking the enqueue. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + + results = [] + # Dequeue to unblock the first blocking_enqueue_op, after which the + # close will complete. + results.append(dequeued_t.eval()) + self.assertTrue(results[0] in elems) + thread2.join() + thread1.join() + + def testBlockingEnqueueManyToClosedQueue(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue(4, 0, tf.float32, ((),)) + elems = [10.0, 20.0, 30.0] + enqueue_op = q.enqueue_many((elems,)) + blocking_enqueue_op = q.enqueue_many(([50.0, 60.0],)) + close_op = q.close() + size_t = q.size() + + enqueue_op.run() + self.assertEqual(size_t.eval(), 3) + + def blocking_enqueue(): + # This will block until the dequeue after the close. + sess.run(blocking_enqueue_op) + # At this point the close operation will become unblocked, so the + # next enqueue will fail. + with self.assertRaisesRegexp(tf.errors.AbortedError, "closed"): + sess.run(blocking_enqueue_op) + thread1 = self.checkedThread(target=blocking_enqueue) + thread1.start() + # The close_op should run after the blocking_enqueue_op has blocked. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + # First blocking_enqueue_op of blocking_enqueue has enqueued 1 of 2 + # elements, and is blocked waiting for one more element to be dequeue. + self.assertEqual(size_t.eval(), 4) + + def blocking_close(): + sess.run(close_op) + thread2 = self.checkedThread(target=blocking_close) + thread2.start() + + # The close_op should run before the second blocking_enqueue_op + # has started. + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + + # Unblock the first blocking_enqueue_op in blocking_enqueue. + q.dequeue().eval() + + thread2.join() + thread1.join() + + def testSharedQueueSameSession(self): + with self.test_session(): + q1 = tf.RandomShuffleQueue( + 1, 0, tf.float32, ((),), shared_name="shared_queue") + q1.enqueue((10.0,)).run() + + q2 = tf.RandomShuffleQueue( + 1, 0, tf.float32, ((),), shared_name="shared_queue") + + q1_size_t = q1.size() + q2_size_t = q2.size() + + self.assertEqual(q1_size_t.eval(), 1) + self.assertEqual(q2_size_t.eval(), 1) + + self.assertEqual(q2.dequeue().eval(), 10.0) + + self.assertEqual(q1_size_t.eval(), 0) + self.assertEqual(q2_size_t.eval(), 0) + + q2.enqueue((20.0,)).run() + + self.assertEqual(q1_size_t.eval(), 1) + self.assertEqual(q2_size_t.eval(), 1) + + self.assertEqual(q1.dequeue().eval(), 20.0) + + self.assertEqual(q1_size_t.eval(), 0) + self.assertEqual(q2_size_t.eval(), 0) + + def testIncompatibleSharedQueueErrors(self): + with self.test_session(): + q_a_1 = tf.RandomShuffleQueue( + 10, 5, tf.float32, shared_name="q_a") + q_a_2 = tf.RandomShuffleQueue( + 15, 5, tf.float32, shared_name="q_a") + q_a_1.queue_ref.eval() + with self.assertRaisesOpError("capacity"): + q_a_2.queue_ref.eval() + + q_b_1 = tf.RandomShuffleQueue( + 10, 0, tf.float32, shared_name="q_b") + q_b_2 = tf.RandomShuffleQueue( + 10, 5, tf.float32, shared_name="q_b") + q_b_1.queue_ref.eval() + with self.assertRaisesOpError("min_after_dequeue"): + q_b_2.queue_ref.eval() + + q_c_1 = tf.RandomShuffleQueue( + 10, 5, tf.float32, shared_name="q_c") + q_c_2 = tf.RandomShuffleQueue( + 10, 5, tf.int32, shared_name="q_c") + q_c_1.queue_ref.eval() + with self.assertRaisesOpError("component types"): + q_c_2.queue_ref.eval() + + q_d_1 = tf.RandomShuffleQueue( + 10, 5, tf.float32, shared_name="q_d") + q_d_2 = tf.RandomShuffleQueue( + 10, 5, tf.float32, shapes=[(1, 1, 2, 3)], shared_name="q_d") + q_d_1.queue_ref.eval() + with self.assertRaisesOpError("component shapes"): + q_d_2.queue_ref.eval() + + q_e_1 = tf.RandomShuffleQueue( + 10, 5, tf.float32, shapes=[(1, 1, 2, 3)], shared_name="q_e") + q_e_2 = tf.RandomShuffleQueue( + 10, 5, tf.float32, shared_name="q_e") + q_e_1.queue_ref.eval() + with self.assertRaisesOpError("component shapes"): + q_e_2.queue_ref.eval() + + q_f_1 = tf.RandomShuffleQueue( + 10, 5, tf.float32, shapes=[(1, 1, 2, 3)], shared_name="q_f") + q_f_2 = tf.RandomShuffleQueue( + 10, 5, tf.float32, shapes=[(1, 1, 2, 4)], shared_name="q_f") + q_f_1.queue_ref.eval() + with self.assertRaisesOpError("component shapes"): + q_f_2.queue_ref.eval() + + q_g_1 = tf.RandomShuffleQueue( + 10, 5, tf.float32, shared_name="q_g") + q_g_2 = tf.RandomShuffleQueue( + 10, 5, (tf.float32, tf.int32), shared_name="q_g") + q_g_1.queue_ref.eval() + with self.assertRaisesOpError("component types"): + q_g_2.queue_ref.eval() + + q_h_1 = tf.RandomShuffleQueue( + 10, 5, tf.float32, seed=12, shared_name="q_h") + q_h_2 = tf.RandomShuffleQueue( + 10, 5, tf.float32, seed=21, shared_name="q_h") + q_h_1.queue_ref.eval() + with self.assertRaisesOpError("random seeds"): + q_h_2.queue_ref.eval() + + def testSelectQueue(self): + with self.test_session(): + num_queues = 10 + qlist = list() + for _ in xrange(num_queues): + qlist.append( + tf.RandomShuffleQueue(10, 0, tf.float32)) + # Enqueue/Dequeue into a dynamically selected queue + for _ in xrange(20): + index = np.random.randint(num_queues) + q = tf.RandomShuffleQueue.from_list(index, qlist) + q.enqueue((10.,)).run() + self.assertEqual(q.dequeue().eval(), 10.0) + + def testSelectQueueOutOfRange(self): + with self.test_session(): + q1 = tf.RandomShuffleQueue(10, 0, tf.float32) + q2 = tf.RandomShuffleQueue(15, 0, tf.float32) + enq_q = tf.RandomShuffleQueue.from_list(3, [q1, q2]) + with self.assertRaisesOpError("Index must be in the range"): + enq_q.dequeue().eval() + + def _blockingDequeue(self, sess, dequeue_op): + with self.assertRaisesOpError("Dequeue operation was cancelled"): + sess.run(dequeue_op) + + def _blockingDequeueMany(self, sess, dequeue_many_op): + with self.assertRaisesOpError("Dequeue operation was cancelled"): + sess.run(dequeue_many_op) + + def _blockingEnqueue(self, sess, enqueue_op): + with self.assertRaisesOpError("Enqueue operation was cancelled"): + sess.run(enqueue_op) + + def _blockingEnqueueMany(self, sess, enqueue_many_op): + with self.assertRaisesOpError("Enqueue operation was cancelled"): + sess.run(enqueue_many_op) + + def testResetOfBlockingOperation(self): + with self.test_session() as sess: + q_empty = tf.RandomShuffleQueue( + 5, 0, tf.float32, ((),)) + dequeue_op = q_empty.dequeue() + dequeue_many_op = q_empty.dequeue_many(1) + + q_full = tf.RandomShuffleQueue(5, 0, tf.float32, ((),)) + sess.run(q_full.enqueue_many(([1.0, 2.0, 3.0, 4.0, 5.0],))) + enqueue_op = q_full.enqueue((6.0,)) + enqueue_many_op = q_full.enqueue_many(([6.0],)) + + threads = [ + self.checkedThread(self._blockingDequeue, args=(sess, dequeue_op)), + self.checkedThread(self._blockingDequeueMany, args=(sess, + dequeue_many_op)), + self.checkedThread(self._blockingEnqueue, args=(sess, enqueue_op)), + self.checkedThread(self._blockingEnqueueMany, args=(sess, + enqueue_many_op))] + for t in threads: + t.start() + time.sleep(0.1) + sess.close() # Will cancel the blocked operations. + for t in threads: + t.join() + + def testDequeueManyInDifferentOrders(self): + with self.test_session(): + # Specify seeds to make the test deterministic + # (https://en.wikipedia.org/wiki/Taxicab_number). + q1 = tf.RandomShuffleQueue(10, 5, tf.int32, + ((),), seed=1729) + q2 = tf.RandomShuffleQueue(10, 5, tf.int32, + ((),), seed=87539319) + enq1 = q1.enqueue_many(([1, 2, 3, 4, 5],)) + enq2 = q2.enqueue_many(([1, 2, 3, 4, 5],)) + deq1 = q1.dequeue_many(5) + deq2 = q2.dequeue_many(5) + + enq1.run() + enq1.run() + enq2.run() + enq2.run() + + results = [[], [], [], []] + + results[0].extend(deq1.eval()) + results[1].extend(deq2.eval()) + + q1.close().run() + q2.close().run() + + results[2].extend(deq1.eval()) + results[3].extend(deq2.eval()) + + # No two should match + for i in range(1, 4): + for j in range(i): + self.assertNotEqual(results[i], results[j]) + + def testDequeueInDifferentOrders(self): + with self.test_session(): + # Specify seeds to make the test deterministic + # (https://en.wikipedia.org/wiki/Taxicab_number). + q1 = tf.RandomShuffleQueue(10, 5, tf.int32, + ((),), seed=1729) + q2 = tf.RandomShuffleQueue(10, 5, tf.int32, + ((),), seed=87539319) + enq1 = q1.enqueue_many(([1, 2, 3, 4, 5],)) + enq2 = q2.enqueue_many(([1, 2, 3, 4, 5],)) + deq1 = q1.dequeue() + deq2 = q2.dequeue() + + enq1.run() + enq1.run() + enq2.run() + enq2.run() + + results = [[], [], [], []] + + for _ in range(5): + results[0].append(deq1.eval()) + results[1].append(deq2.eval()) + + q1.close().run() + q2.close().run() + + for _ in range(5): + results[2].append(deq1.eval()) + results[3].append(deq2.eval()) + + # No two should match + for i in range(1, 4): + for j in range(i): + self.assertNotEqual(results[i], results[j]) + + def testBigEnqueueMany(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue( + 5, 0, tf.int32, ((),)) + elem = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + enq = q.enqueue_many((elem,)) + deq = q.dequeue() + size_op = q.size() + + enq_done = [] + def blocking_enqueue(): + enq_done.append(False) + # This will fill the queue and then block until enough dequeues happen. + sess.run(enq) + enq_done.append(True) + thread = self.checkedThread(target=blocking_enqueue) + thread.start() + + # The enqueue should start and then block. + results = [] + results.append(deq.eval()) # Will only complete after the enqueue starts. + self.assertEqual(len(enq_done), 1) + self.assertEqual(sess.run(size_op), 5) + + for _ in range(3): + results.append(deq.eval()) + + time.sleep(0.1) + self.assertEqual(len(enq_done), 1) + self.assertEqual(sess.run(size_op), 5) + + # This dequeue will unblock the thread. + results.append(deq.eval()) + time.sleep(0.1) + self.assertEqual(len(enq_done), 2) + thread.join() + + for i in range(5): + self.assertEqual(size_op.eval(), 5 - i) + results.append(deq.eval()) + self.assertEqual(size_op.eval(), 5 - i - 1) + + self.assertItemsEqual(elem, results) + + def testBigDequeueMany(self): + with self.test_session() as sess: + q = tf.RandomShuffleQueue(2, 0, tf.int32, ((),)) + elem = range(4) + enq_list = [q.enqueue((e,)) for e in elem] + deq = q.dequeue_many(4) + + results = [] + def blocking_dequeue(): + # Will only complete after 4 enqueues complete. + results.extend(sess.run(deq)) + thread = self.checkedThread(target=blocking_dequeue) + thread.start() + # The dequeue should start and then block. + for enq in enq_list: + # TODO(mrry): Figure out how to do this without sleeping. + time.sleep(0.1) + self.assertEqual(len(results), 0) + sess.run(enq) + + # Enough enqueued to unblock the dequeue + thread.join() + self.assertItemsEqual(elem, results) + + +if __name__ == "__main__": + tf.test.main() |