# Copyright 2017 The TensorFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """Functional tests for SpaceToBatch and BatchToSpace ops.""" from __future__ import absolute_import from __future__ import division from __future__ import print_function import numpy as np from tensorflow.compiler.tests import xla_test from tensorflow.python.framework import dtypes from tensorflow.python.ops import array_ops from tensorflow.python.ops import gen_array_ops from tensorflow.python.platform import test def space_to_batch_direct(input_array, block_shape, paddings): """Direct Python implementation of space-to-batch conversion. This is used for tests only. Args: input_array: N-D array block_shape: 1-D array of shape [num_block_dims]. paddings: 2-D array of shape [num_block_dims, 2]. Returns: Converted tensor. """ input_array = np.array(input_array) block_shape = np.array(block_shape) num_block_dims = len(block_shape) paddings = np.array(paddings).reshape((len(block_shape), 2)) padded = np.pad(input_array, pad_width=([[0, 0]] + list(paddings) + [[0, 0]] * (input_array.ndim - 1 - num_block_dims)), mode="constant") reshaped_padded_shape = [input_array.shape[0]] output_shape = [input_array.shape[0] * np.prod(block_shape)] for block_dim, block_shape_value in enumerate(block_shape): reduced_size = padded.shape[block_dim + 1] // block_shape_value reshaped_padded_shape.append(reduced_size) output_shape.append(reduced_size) reshaped_padded_shape.append(block_shape_value) reshaped_padded_shape.extend(input_array.shape[num_block_dims + 1:]) output_shape.extend(input_array.shape[num_block_dims + 1:]) reshaped_padded = padded.reshape(reshaped_padded_shape) permuted_reshaped_padded = np.transpose(reshaped_padded, ( list(np.arange(num_block_dims) * 2 + 2) + [0] + list(np.arange(num_block_dims) * 2 + 1) + list( np.arange(input_array.ndim - num_block_dims - 1) + 1 + num_block_dims * 2))) return permuted_reshaped_padded.reshape(output_shape) class SpaceToBatchTest(xla_test.XLATestCase): """Tests input-output pairs for the SpaceToBatch and BatchToSpace ops.""" def _testPad(self, inputs, paddings, block_size, outputs): with self.cached_session() as sess, self.test_scope(): for dtype in self.float_types: # outputs = space_to_batch(inputs) placeholder = array_ops.placeholder(dtype) x_tf = gen_array_ops.space_to_batch( placeholder, paddings, block_size=block_size) self.assertAllEqual(sess.run(x_tf, {placeholder: inputs}), outputs) # inputs = batch_to_space(outputs) x_tf = gen_array_ops.batch_to_space( placeholder, paddings, block_size=block_size) self.assertAllEqual(sess.run(x_tf, {placeholder: outputs}), inputs) def _testOne(self, inputs, block_size, outputs): paddings = np.zeros((2, 2), dtype=np.int32) self._testPad(inputs, paddings, block_size, outputs) # [1, 2, 2, 1] <-> [4, 1, 1, 1] def testSmallInput2x2(self): x_np = [[[[1], [2]], [[3], [4]]]] block_size = 2 x_out = [[[[1]]], [[[2]]], [[[3]]], [[[4]]]] self._testOne(x_np, block_size, x_out) # [1, 2, 2, 1] <-> [1, 3, 3, 1] (padding) <-> [9, 1, 1, 1] def testSmallInput2x2Pad1x0(self): x_np = [[[[1], [2]], [[3], [4]]]] paddings = np.array([[1, 0], [1, 0]], dtype=np.int32) block_size = 3 x_out = [[[[0]]], [[[0]]], [[[0]]], [[[0]]], [[[1]]], [[[2]]], [[[0]]], [[[3]]], [[[4]]]] self._testPad(x_np, paddings, block_size, x_out) # Test with depth larger than 1. # [1, 2, 2, 3] <-> [4, 1, 1, 3] def testDepthInput2x2(self): x_np = [[[[1, 2, 3], [4, 5, 6]], [[7, 8, 9], [10, 11, 12]]]] block_size = 2 x_out = [[[[1, 2, 3]]], [[[4, 5, 6]]], [[[7, 8, 9]]], [[[10, 11, 12]]]] self._testOne(x_np, block_size, x_out) # Test for larger input dimensions. # [1, 4, 4, 1] <-> [4, 2, 2, 1] def testLargerInput2x2(self): x_np = [[[[1], [2], [3], [4]], [[5], [6], [7], [8]], [[9], [10], [11], [12]], [[13], [14], [15], [16]]]] block_size = 2 x_out = [[[[1], [3]], [[9], [11]]], [[[2], [4]], [[10], [12]]], [[[5], [7]], [[13], [15]]], [[[6], [8]], [[14], [16]]]] self._testOne(x_np, block_size, x_out) # Test with batch larger than 1. # [2, 2, 4, 1] <-> [8, 1, 2, 1] def testBatchInput2x2(self): x_np = [[[[1], [2], [3], [4]], [[5], [6], [7], [8]]], [[[9], [10], [11], [12]], [[13], [14], [15], [16]]]] block_size = 2 x_out = [[[[1], [3]]], [[[9], [11]]], [[[2], [4]]], [[[10], [12]]], [[[5], [7]]], [[[13], [15]]], [[[6], [8]]], [[[14], [16]]]] self._testOne(x_np, block_size, x_out) # Tests for larger input spatial dimensions AND batch larger than 1, to ensure # that elements are correctly laid out spatially and properly interleaved # along the batch dimension. # [2, 4, 4, 1] <-> [8, 2, 2, 1] def testLargerInputBatch2x2(self): x_np = [[[[1], [2], [3], [4]], [[5], [6], [7], [8]], [[9], [10], [11], [12]], [[13], [14], [15], [16]]], [[[17], [18], [19], [20]], [[21], [22], [23], [24]], [[25], [26], [27], [28]], [[29], [30], [31], [32]]]] x_out = [[[[1], [3]], [[9], [11]]], [[[17], [19]], [[25], [27]]], [[[2], [4]], [[10], [12]]], [[[18], [20]], [[26], [28]]], [[[5], [7]], [[13], [15]]], [[[21], [23]], [[29], [31]]], [[[6], [8]], [[14], [16]]], [[[22], [24]], [[30], [32]]]] block_size = 2 self._testOne(x_np, block_size, x_out) class SpaceToBatchNDTest(xla_test.XLATestCase): """Tests input-output pairs for the SpaceToBatchND and BatchToSpaceND ops.""" def _testPad(self, inputs, block_shape, paddings, outputs): block_shape = np.array(block_shape) paddings = np.array(paddings).reshape((len(block_shape), 2)) with self.cached_session() as sess, self.test_scope(): for dtype in self.float_types: # TODO(b/68813416): Skip bfloat16's as the input type for direct is # float32 and results in a mismatch, while making testDirect provide the # correctly typed input results in 'no fill-function for data-type' # error. if dtype == dtypes.bfloat16.as_numpy_dtype: continue if dtype == np.float16: actual_inputs = np.array(inputs).astype(dtype) actual_paddings = np.array(paddings).astype(dtype) expected_outputs = np.array(outputs).astype(dtype) else: actual_inputs = inputs actual_paddings = paddings expected_outputs = outputs placeholder = array_ops.placeholder(dtype) # outputs = space_to_batch(inputs) x_tf = array_ops.space_to_batch_nd(placeholder, block_shape, actual_paddings) self.assertAllEqual( sess.run(x_tf, {placeholder: actual_inputs}), expected_outputs) # inputs = batch_to_space(outputs) placeholder = array_ops.placeholder(dtype) x_tf = array_ops.batch_to_space_nd(placeholder, block_shape, actual_paddings) self.assertAllEqual( sess.run(x_tf, {placeholder: expected_outputs}), actual_inputs) def _testDirect(self, input_shape, block_shape, paddings): inputs = np.arange(np.prod(input_shape), dtype=np.float32) inputs = inputs.reshape(input_shape) self._testPad(inputs, block_shape, paddings, space_to_batch_direct(inputs, block_shape, paddings)) def testZeroBlockDimsZeroRemainingDims(self): self._testPad( inputs=[1, 2], block_shape=[], paddings=[], outputs=[1, 2],) def testZeroBlockDimsOneRemainingDim(self): self._testPad( inputs=[[1, 2], [3, 4]], block_shape=[], paddings=[], outputs=[[1, 2], [3, 4]]) # Same thing, but with a no-op block dim. self._testPad( inputs=[[1, 2], [3, 4]], block_shape=[1], paddings=[[0, 0]], outputs=[[1, 2], [3, 4]]) def testZeroBlockDimsTwoRemainingDims(self): self._testPad( inputs=[[[1, 2], [3, 4]], [[5, 6], [7, 8]]], block_shape=[], paddings=[], outputs=[[[1, 2], [3, 4]], [[5, 6], [7, 8]]]) # Same thing, but with a no-op block dim. self._testPad( inputs=[[[1, 2], [3, 4]], [[5, 6], [7, 8]]], block_shape=[1], paddings=[[0, 0]], outputs=[[[1, 2], [3, 4]], [[5, 6], [7, 8]]]) # Same thing, but with two no-op block dims. self._testPad( inputs=[[[1, 2], [3, 4]], [[5, 6], [7, 8]]], block_shape=[1, 1], paddings=[[0, 0], [0, 0]], outputs=[[[1, 2], [3, 4]], [[5, 6], [7, 8]]]) def testOneBlockDimZeroRemainingDims(self): self._testPad( inputs=[[1, 2, 3], [4, 5, 6]], block_shape=[2], paddings=[1, 0], outputs=[[0, 2], [0, 5], [1, 3], [4, 6]]) def testOneBlockDimOneRemainingDim(self): self._testPad( inputs=[[[1, 11], [2, 21], [3, 31]], [[4, 41], [5, 51], [6, 61]]], block_shape=[2], paddings=[1, 0], outputs=[[[0, 0], [2, 21]], [[0, 0], [5, 51]], [[1, 11], [3, 31]], [[4, 41], [6, 61]]]) def testDirect0(self): # Test with zero-size remaining dimension. self._testDirect( input_shape=[3, 1, 2, 0], block_shape=[3], paddings=[[0, 2]]) def testDirect1(self): # Test with zero-size blocked dimension. self._testDirect( input_shape=[3, 0, 2, 5], block_shape=[3], paddings=[[0, 0]]) def testDirect2(self): # Test with padding up from zero size. self._testDirect( input_shape=[3, 0, 2, 5], block_shape=[3], paddings=[[1, 2]]) def testDirect3(self): self._testDirect( input_shape=[3, 3, 4, 5, 2], block_shape=[3, 4, 2], paddings=[[1, 2], [0, 0], [3, 0]]) def testDirect4(self): self._testDirect( input_shape=[3, 3, 4, 5, 2], block_shape=[3, 4, 2, 2], paddings=[[1, 2], [0, 0], [3, 0], [0, 0]]) def testDirect5(self): self._testDirect( input_shape=[3, 2, 2, 3, 4, 5, 2, 5], block_shape=[1, 1, 3, 4, 2, 2], paddings=[[0, 0], [0, 0], [1, 2], [0, 0], [3, 0], [0, 0]]) def testDirect6(self): self._testDirect( input_shape=[3, 2, 2, 3, 4, 5, 2, 5], block_shape=[1, 1, 3, 4, 2, 2, 1], paddings=[[0, 0], [0, 0], [1, 2], [0, 0], [3, 0], [0, 0], [0, 0]]) if __name__ == "__main__": test.main()