diff options
Diffstat (limited to 'tensorflow/python/kernel_tests/map_stage_op_test.py')
-rw-r--r-- | tensorflow/python/kernel_tests/map_stage_op_test.py | 556 |
1 files changed, 556 insertions, 0 deletions
diff --git a/tensorflow/python/kernel_tests/map_stage_op_test.py b/tensorflow/python/kernel_tests/map_stage_op_test.py new file mode 100644 index 0000000000..2d2169c310 --- /dev/null +++ b/tensorflow/python/kernel_tests/map_stage_op_test.py @@ -0,0 +1,556 @@ +# Copyright 2017 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from tensorflow.python.framework import errors +from tensorflow.python.framework import dtypes +from tensorflow.python.framework import ops +from tensorflow.python.ops import array_ops +from tensorflow.python.ops import data_flow_ops +from tensorflow.python.ops import math_ops +from tensorflow.python.platform import test + + +class MapStageTest(test.TestCase): + + def testSimple(self): + with ops.Graph().as_default() as G: + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.float32) + pi = array_ops.placeholder(dtypes.int64) + gi = array_ops.placeholder(dtypes.int64) + v = 2. * (array_ops.zeros([128, 128]) + x) + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.MapStagingArea([dtypes.float32]) + stage = stager.put(pi, [v], [0]) + k, y = stager.get(gi) + y = math_ops.reduce_max(math_ops.matmul(y, y)) + + G.finalize() + + with self.test_session(use_gpu=True, graph=G) as sess: + sess.run(stage, feed_dict={x: -1, pi: 0}) + for i in range(10): + _, yval = sess.run([stage, y], feed_dict={x: i, pi: i+1, gi:i}) + self.assertAllClose(4 * (i - 1) * (i - 1) * 128, yval, rtol=1e-4) + + def testMultiple(self): + with ops.Graph().as_default() as G: + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.float32) + pi = array_ops.placeholder(dtypes.int64) + gi = array_ops.placeholder(dtypes.int64) + v = 2. * (array_ops.zeros([128, 128]) + x) + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.MapStagingArea([dtypes.float32, dtypes.float32]) + stage = stager.put(pi, [x, v], [0, 1]) + k, (z, y) = stager.get(gi) + y = math_ops.reduce_max(z * math_ops.matmul(y, y)) + + G.finalize() + + with self.test_session(use_gpu=True, graph=G) as sess: + sess.run(stage, feed_dict={x: -1, pi: 0}) + for i in range(10): + _, yval = sess.run([stage, y], feed_dict={x: i, pi: i+1, gi:i}) + self.assertAllClose( + 4 * (i - 1) * (i - 1) * (i - 1) * 128, yval, rtol=1e-4) + + def testDictionary(self): + with ops.Graph().as_default() as G: + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.float32) + pi = array_ops.placeholder(dtypes.int64) + gi = array_ops.placeholder(dtypes.int64) + v = 2. * (array_ops.zeros([128, 128]) + x) + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.MapStagingArea( + [dtypes.float32, dtypes.float32], + shapes=[[], [128, 128]], + names=['x', 'v']) + stage = stager.put(pi,{'x': x, 'v': v}) + key, ret = stager.get(gi) + z = ret['x'] + y = ret['v'] + y = math_ops.reduce_max(z * math_ops.matmul(y, y)) + + G.finalize() + + with self.test_session(use_gpu=True, graph=G) as sess: + sess.run(stage, feed_dict={x: -1, pi: 0}) + for i in range(10): + _, yval = sess.run([stage, y], feed_dict={x: i, pi: i+1, gi:i}) + self.assertAllClose( + 4 * (i - 1) * (i - 1) * (i - 1) * 128, yval, rtol=1e-4) + + def testColocation(self): + gpu_dev = test.gpu_device_name() + + with ops.Graph().as_default() as G: + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.float32) + v = 2. * (array_ops.zeros([128, 128]) + x) + with ops.device(gpu_dev): + stager = data_flow_ops.MapStagingArea([dtypes.float32]) + y = stager.put(1, [v], [0]) + self.assertEqual(y.device, '/device:GPU:0' if gpu_dev + else gpu_dev) + with ops.device('/cpu:0'): + _, x = stager.get(1) + y = stager.peek(1) + _, z = stager.get() + self.assertEqual(x.device, '/device:CPU:0') + self.assertEqual(y.device, '/device:CPU:0') + self.assertEqual(z.device, '/device:CPU:0') + + G.finalize() + + def testPeek(self): + with ops.Graph().as_default() as G: + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.int32, name='x') + pi = array_ops.placeholder(dtypes.int64) + gi = array_ops.placeholder(dtypes.int64) + p = array_ops.placeholder(dtypes.int32, name='p') + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.MapStagingArea([dtypes.int32, ], shapes=[[]]) + stage = stager.put(pi,[x], [0]) + peek = stager.peek(gi) + size = stager.size() + + G.finalize() + + n = 10 + + with self.test_session(use_gpu=True, graph=G) as sess: + for i in range(n): + sess.run(stage, feed_dict={x:i, pi:i}) + + for i in range(n): + self.assertTrue(sess.run(peek, feed_dict={gi: i}) == i) + + self.assertTrue(sess.run(size) == 10) + + def testSizeAndClear(self): + with ops.Graph().as_default() as G: + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.float32, name='x') + pi = array_ops.placeholder(dtypes.int64) + gi = array_ops.placeholder(dtypes.int64) + v = 2. * (array_ops.zeros([128, 128]) + x) + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.MapStagingArea( + [dtypes.float32, dtypes.float32], + shapes=[[], [128, 128]], + names=['x', 'v']) + stage = stager.put(pi,{'x': x, 'v': v}) + size = stager.size() + clear = stager.clear() + + G.finalize() + + with self.test_session(use_gpu=True, graph=G) as sess: + sess.run(stage, feed_dict={x: -1, pi: 3}) + self.assertEqual(sess.run(size), 1) + sess.run(stage, feed_dict={x: -1, pi: 1}) + self.assertEqual(sess.run(size), 2) + sess.run(clear) + self.assertEqual(sess.run(size), 0) + + + def testCapacity(self): + capacity = 3 + + with ops.Graph().as_default() as G: + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.int32, name='x') + pi = array_ops.placeholder(dtypes.int64, name='pi') + gi = array_ops.placeholder(dtypes.int64, name='gi') + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.MapStagingArea([dtypes.int32, ], + capacity=capacity, shapes=[[]]) + + stage = stager.put(pi, [x], [0]) + get = stager.get() + size = stager.size() + + G.finalize() + + from six.moves import queue as Queue + import threading + + queue = Queue.Queue() + n = 5 + missed = 0 + + with self.test_session(use_gpu=True, graph=G) as sess: + # Stage data in a separate thread which will block + # when it hits the staging area's capacity and thus + # not fill the queue with n tokens + def thread_run(): + for i in range(n): + sess.run(stage, feed_dict={x: i, pi: i}) + queue.put(0) + + t = threading.Thread(target=thread_run) + t.start() + + # Get tokens from the queue, making notes of when we timeout + for i in range(n): + try: + queue.get(timeout=0.05) + except Queue.Empty: + missed += 1 + + # We timed out n - capacity times waiting for queue puts + self.assertTrue(missed == n - capacity) + + # Clear the staging area out a bit + for i in range(n - capacity): + sess.run(get) + + # This should now succeed + t.join() + + self.assertTrue(sess.run(size) == capacity) + + # Clear out the staging area completely + for i in range(capacity): + sess.run(get) + + def testMemoryLimit(self): + memory_limit = 512*1024 # 512K + chunk = 200*1024 # 256K + capacity = memory_limit // chunk + + with ops.Graph().as_default() as G: + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.uint8, name='x') + pi = array_ops.placeholder(dtypes.int64, name='pi') + gi = array_ops.placeholder(dtypes.int64, name='gi') + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.MapStagingArea([dtypes.uint8], + memory_limit=memory_limit, shapes=[[]]) + stage = stager.put(pi, [x], [0]) + get = stager.get() + size = stager.size() + + G.finalize() + + from six.moves import queue as Queue + import threading + import numpy as np + + queue = Queue.Queue() + n = 5 + missed = 0 + + with self.test_session(use_gpu=True, graph=G) as sess: + # Stage data in a separate thread which will block + # when it hits the staging area's capacity and thus + # not fill the queue with n tokens + def thread_run(): + for i in range(n): + sess.run(stage, feed_dict={x: np.full(chunk, i, dtype=np.uint8), + pi: i}) + queue.put(0) + + t = threading.Thread(target=thread_run) + t.start() + + # Get tokens from the queue, making notes of when we timeout + for i in range(n): + try: + queue.get(timeout=0.05) + except Queue.Empty: + missed += 1 + + # We timed out n - capacity times waiting for queue puts + self.assertTrue(missed == n - capacity) + + # Clear the staging area out a bit + for i in range(n - capacity): + sess.run(get) + + # This should now succeed + t.join() + + self.assertTrue(sess.run(size) == capacity) + + # Clear out the staging area completely + for i in range(capacity): + sess.run(get) + + def testOrdering(self): + import six + import random + + with ops.Graph().as_default() as G: + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.int32, name='x') + pi = array_ops.placeholder(dtypes.int64, name='pi') + gi = array_ops.placeholder(dtypes.int64, name='gi') + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.MapStagingArea([dtypes.int32, ], + shapes=[[]], ordered=True) + stage = stager.put(pi, [x], [0]) + get = stager.get() + size = stager.size() + + G.finalize() + + n = 10 + + with self.test_session(use_gpu=True, graph=G) as sess: + # Keys n-1..0 + keys = list(reversed(six.moves.range(n))) + + for i in keys: + sess.run(stage, feed_dict={pi: i, x: i}) + + self.assertTrue(sess.run(size) == n) + + # Check that key, values come out in ascending order + for i, k in enumerate(reversed(keys)): + get_key, values = sess.run(get) + self.assertTrue(i == k == get_key == values) + + self.assertTrue(sess.run(size) == 0) + + def testPartialDictInsert(self): + with ops.Graph().as_default() as G: + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.float32) + f = array_ops.placeholder(dtypes.float32) + v = array_ops.placeholder(dtypes.float32) + pi = array_ops.placeholder(dtypes.int64) + gi = array_ops.placeholder(dtypes.int64) + with ops.device(test.gpu_device_name()): + # Test barrier with dictionary + stager = data_flow_ops.MapStagingArea( + [dtypes.float32, dtypes.float32, dtypes.float32], + names=['x', 'v', 'f']) + stage_xf = stager.put(pi,{'x': x, 'f': f}) + stage_v = stager.put(pi, {'v': v}) + key, ret = stager.get(gi) + size = stager.size() + isize = stager.incomplete_size() + + G.finalize() + + with self.test_session(use_gpu=True, graph=G) as sess: + # 0 complete and incomplete entries + self.assertTrue(sess.run([size, isize]) == [0, 0]) + # Stage key 0, x and f tuple entries + sess.run(stage_xf, feed_dict={pi: 0, x: 1, f: 2}) + self.assertTrue(sess.run([size, isize]) == [0, 1]) + # Stage key 1, x and f tuple entries + sess.run(stage_xf, feed_dict={pi: 1, x: 1, f: 2}) + self.assertTrue(sess.run([size, isize]) == [0, 2]) + + # Now complete key 0 with tuple entry v + sess.run(stage_v, feed_dict={pi: 0, v: 1}) + # 1 complete and 1 incomplete entry + self.assertTrue(sess.run([size, isize]) == [1, 1]) + # We can now obtain tuple associated with key 0 + self.assertTrue(sess.run([key, ret], feed_dict={gi:0}) + == [0, { 'x':1, 'f':2, 'v':1}]) + + # 0 complete and 1 incomplete entry + self.assertTrue(sess.run([size, isize]) == [0, 1]) + # Now complete key 1 with tuple entry v + sess.run(stage_v, feed_dict={pi: 1, v: 3}) + # We can now obtain tuple associated with key 1 + self.assertTrue(sess.run([key, ret], feed_dict={gi:1}) + == [1, { 'x':1, 'f':2, 'v':3}]) + + def testPartialIndexInsert(self): + with ops.Graph().as_default() as G: + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.float32) + f = array_ops.placeholder(dtypes.float32) + v = array_ops.placeholder(dtypes.float32) + pi = array_ops.placeholder(dtypes.int64) + gi = array_ops.placeholder(dtypes.int64) + with ops.device(test.gpu_device_name()): + stager = data_flow_ops.MapStagingArea( + [dtypes.float32, dtypes.float32, dtypes.float32]) + stage_xf = stager.put(pi, [x, f], [0, 2]) + stage_v = stager.put(pi, [v], [1]) + key, ret = stager.get(gi) + size = stager.size() + isize = stager.incomplete_size() + + G.finalize() + + with self.test_session(use_gpu=True, graph=G) as sess: + # 0 complete and incomplete entries + self.assertTrue(sess.run([size, isize]) == [0, 0]) + # Stage key 0, x and f tuple entries + sess.run(stage_xf, feed_dict={pi: 0, x: 1, f: 2}) + self.assertTrue(sess.run([size, isize]) == [0, 1]) + # Stage key 1, x and f tuple entries + sess.run(stage_xf, feed_dict={pi: 1, x: 1, f: 2}) + self.assertTrue(sess.run([size, isize]) == [0, 2]) + + # Now complete key 0 with tuple entry v + sess.run(stage_v, feed_dict={pi: 0, v: 1}) + # 1 complete and 1 incomplete entry + self.assertTrue(sess.run([size, isize]) == [1, 1]) + # We can now obtain tuple associated with key 0 + self.assertTrue(sess.run([key, ret], feed_dict={gi:0}) + == [0, [1, 1, 2]]) + + # 0 complete and 1 incomplete entry + self.assertTrue(sess.run([size, isize]) == [0, 1]) + # Now complete key 1 with tuple entry v + sess.run(stage_v, feed_dict={pi: 1, v: 3}) + # We can now obtain tuple associated with key 1 + self.assertTrue(sess.run([key, ret], feed_dict={gi:1}) + == [1, [1,3, 2]]) + + def testPartialDictGetsAndPeeks(self): + with ops.Graph().as_default() as G: + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.float32) + f = array_ops.placeholder(dtypes.float32) + v = array_ops.placeholder(dtypes.float32) + pi = array_ops.placeholder(dtypes.int64) + pei = array_ops.placeholder(dtypes.int64) + gi = array_ops.placeholder(dtypes.int64) + with ops.device(test.gpu_device_name()): + # Test barrier with dictionary + stager = data_flow_ops.MapStagingArea( + [dtypes.float32, dtypes.float32, dtypes.float32], + names=['x', 'v', 'f']) + stage_xf = stager.put(pi,{'x': x, 'f': f}) + stage_v = stager.put(pi, {'v': v}) + peek_xf = stager.peek(pei, ['x', 'f']) + peek_v = stager.peek(pei, ['v']) + key_xf, get_xf = stager.get(gi, ['x', 'f']) + key_v, get_v = stager.get(gi, ['v']) + pop_key_xf, pop_xf = stager.get(indices=['x', 'f']) + pop_key_v, pop_v = stager.get(pi, ['v']) + size = stager.size() + isize = stager.incomplete_size() + + G.finalize() + + with self.test_session(use_gpu=True, graph=G) as sess: + # 0 complete and incomplete entries + self.assertTrue(sess.run([size, isize]) == [0, 0]) + # Stage key 0, x and f tuple entries + sess.run(stage_xf, feed_dict={pi: 0, x: 1, f: 2}) + self.assertTrue(sess.run([size, isize]) == [0, 1]) + # Stage key 1, x and f tuple entries + sess.run(stage_xf, feed_dict={pi: 1, x: 1, f: 2}) + self.assertTrue(sess.run([size, isize]) == [0, 2]) + + # Now complete key 0 with tuple entry v + sess.run(stage_v, feed_dict={pi: 0, v: 1}) + # 1 complete and 1 incomplete entry + self.assertTrue(sess.run([size, isize]) == [1, 1]) + + # We can now peek at 'x' and 'f' values associated with key 0 + self.assertTrue(sess.run(peek_xf, feed_dict={pei:0}) + == { 'x':1, 'f':2}) + # Peek at 'v' value associated with key 0 + self.assertTrue(sess.run(peek_v, feed_dict={pei:0}) + == { 'v':1}) + # 1 complete and 1 incomplete entry + self.assertTrue(sess.run([size, isize]) == [1, 1]) + + # We can now obtain 'x' and 'f' values associated with key 0 + self.assertTrue(sess.run([key_xf, get_xf], feed_dict={gi:0}) + == [0, { 'x':1, 'f':2}]) + # Still have 1 complete and 1 incomplete entry + self.assertTrue(sess.run([size, isize]) == [1, 1]) + + # We can no longer get 'x' and 'f' from key 0 + with self.assertRaises(errors.InvalidArgumentError) as cm: + sess.run([key_xf, get_xf], feed_dict={gi:0}) + + exc_str = ("Tensor at index '0' for key '0' " + "has already been removed.") + + self.assertTrue(exc_str in cm.exception.message) + + # Obtain 'v' value associated with key 0 + self.assertTrue(sess.run([key_v, get_v], feed_dict={gi:0}) + == [0, { 'v':1}]) + # 0 complete and 1 incomplete entry + self.assertTrue(sess.run([size, isize]) == [0, 1]) + + # Now complete key 1 with tuple entry v + sess.run(stage_v, feed_dict={pi: 1, v: 1}) + # 1 complete and 1 incomplete entry + self.assertTrue(sess.run([size, isize]) == [1, 0]) + + # Pop without key to obtain 'x' and 'f' values associated with key 1 + self.assertTrue(sess.run([pop_key_xf, pop_xf]) + == [1, { 'x':1, 'f':2}]) + # still 1 complete and 1 incomplete entry + self.assertTrue(sess.run([size, isize]) == [1, 0]) + # We can now obtain 'x' and 'f' values associated with key 1 + self.assertTrue(sess.run([pop_key_v, pop_v], feed_dict={pi:1}) + == [1, { 'v': 1 }]) + # Nothing is left + self.assertTrue(sess.run([size, isize]) == [0, 0]) + + def testPartialIndexGets(self): + with ops.Graph().as_default() as G: + with ops.device('/cpu:0'): + x = array_ops.placeholder(dtypes.float32) + f = array_ops.placeholder(dtypes.float32) + v = array_ops.placeholder(dtypes.float32) + pi = array_ops.placeholder(dtypes.int64) + pei = array_ops.placeholder(dtypes.int64) + gi = array_ops.placeholder(dtypes.int64) + with ops.device(test.gpu_device_name()): + # Test again with partial index gets + stager = data_flow_ops.MapStagingArea( + [dtypes.float32, dtypes.float32, dtypes.float32]) + stage_xvf = stager.put(pi, [x, v, f], [0, 1, 2]) + key_xf, get_xf = stager.get(gi, [0, 2]) + key_v, get_v = stager.get(gi, [1]) + size = stager.size() + isize = stager.incomplete_size() + + G.finalize() + + with self.test_session(use_gpu=True, graph=G) as sess: + # Stage complete tuple + sess.run(stage_xvf, feed_dict={pi: 0, x: 1, f: 2, v: 3}) + + self.assertTrue(sess.run([size, isize]) == [1, 0]) + + # Partial get using indices + self.assertTrue(sess.run([key_xf, get_xf], + feed_dict={gi: 0}) == [0, [1, 2]]) + + # Still some of key 0 left + self.assertTrue(sess.run([size, isize]) == [1, 0]) + + # Partial get of remaining index + self.assertTrue(sess.run([key_v, get_v], + feed_dict={gi: 0}) == [0, [3]]) + + # All gone + self.assertTrue(sess.run([size, isize]) == [0, 0]) + +if __name__ == '__main__': + test.main() |