aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/contrib/eager/python/examples/revnet/blocks_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'tensorflow/contrib/eager/python/examples/revnet/blocks_test.py')
-rw-r--r--tensorflow/contrib/eager/python/examples/revnet/blocks_test.py264
1 files changed, 104 insertions, 160 deletions
diff --git a/tensorflow/contrib/eager/python/examples/revnet/blocks_test.py b/tensorflow/contrib/eager/python/examples/revnet/blocks_test.py
index a28ca6e3e0..d74785c8fe 100644
--- a/tensorflow/contrib/eager/python/examples/revnet/blocks_test.py
+++ b/tensorflow/contrib/eager/python/examples/revnet/blocks_test.py
@@ -22,6 +22,27 @@ import tensorflow as tf
from tensorflow.contrib.eager.python.examples.revnet import blocks
+def compute_degree(g1, g2, eps=1e-7):
+ """Compute the degree between two vectors using their usual inner product."""
+
+ def _dot(u, v):
+ return tf.reduce_sum(u * v)
+
+ g1_norm = tf.sqrt(_dot(g1, g1))
+ g2_norm = tf.sqrt(_dot(g2, g2))
+ if g1_norm.numpy() == 0 and g2_norm.numpy() == 0:
+ cosine = 1. - eps
+ else:
+ g1_norm = 1. if g1_norm.numpy() == 0 else g1_norm
+ g2_norm = 1. if g2_norm.numpy() == 0 else g2_norm
+ cosine = _dot(g1, g2) / g1_norm / g2_norm
+ # Restrict to arccos range
+ cosine = tf.minimum(tf.maximum(cosine, eps - 1.), 1. - eps)
+ degree = tf.acos(cosine) * 180. / 3.141592653589793
+
+ return degree
+
+
def _validate_block_call_channels_last(block_factory, test):
"""Generic testing function for `channels_last` data format.
@@ -33,30 +54,30 @@ def _validate_block_call_channels_last(block_factory, test):
test: tf.test.TestCase object
"""
with tf.device("/cpu:0"): # NHWC format
- input_shape = (224, 224, 32)
+ input_shape = (8, 8, 128)
data_shape = (16,) + input_shape
x = tf.random_normal(shape=data_shape)
# Stride 1
block = block_factory(
- filters=64,
+ filters=128,
strides=(1, 1),
input_shape=input_shape,
data_format="channels_last")
y_tr, y_ev = block(x, training=True), block(x, training=False)
test.assertEqual(y_tr.shape, y_ev.shape)
- test.assertEqual(y_ev.shape, (16, 224, 224, 64))
+ test.assertEqual(y_ev.shape, (16, 8, 8, 128))
test.assertNotAllClose(y_tr, y_ev)
# Stride of 2
block = block_factory(
- filters=64,
+ filters=128,
strides=(2, 2),
input_shape=input_shape,
data_format="channels_last")
y_tr, y_ev = block(x, training=True), block(x, training=False)
test.assertEqual(y_tr.shape, y_ev.shape)
- test.assertEqual(y_ev.shape, (16, 112, 112, 64))
+ test.assertEqual(y_ev.shape, (16, 4, 4, 128))
test.assertNotAllClose(y_tr, y_ev)
@@ -74,22 +95,22 @@ def _validate_block_call_channels_first(block_factory, test):
test.skipTest("GPU not available")
with tf.device("/gpu:0"): # Default NCHW format
- input_shape = (32, 224, 224)
+ input_shape = (128, 8, 8)
data_shape = (16,) + input_shape
x = tf.random_normal(shape=data_shape)
# Stride of 1
- block = block_factory(filters=64, strides=(1, 1), input_shape=input_shape)
+ block = block_factory(filters=128, strides=(1, 1), input_shape=input_shape)
y_tr, y_ev = block(x, training=True), block(x, training=False)
test.assertEqual(y_tr.shape, y_ev.shape)
- test.assertEqual(y_ev.shape, (16, 64, 224, 224))
+ test.assertEqual(y_ev.shape, (16, 128, 8, 8))
test.assertNotAllClose(y_tr, y_ev)
# Stride of 2
- block = block_factory(filters=64, strides=(2, 2), input_shape=input_shape)
+ block = block_factory(filters=128, strides=(2, 2), input_shape=input_shape)
y_tr, y_ev = block(x, training=True), block(x, training=False)
test.assertEqual(y_tr.shape, y_ev.shape)
- test.assertEqual(y_ev.shape, (16, 64, 112, 112))
+ test.assertEqual(y_ev.shape, (16, 128, 4, 4))
test.assertNotAllClose(y_tr, y_ev)
@@ -101,121 +122,116 @@ class RevBlockTest(tf.test.TestCase):
self.skipTest("GPU not available")
with tf.device("/gpu:0"): # Default NCHW format
- input_shape = (32, 224, 224)
+ input_shape = (128, 8, 8)
data_shape = (16,) + input_shape
x = tf.random_normal(shape=data_shape)
# Stride of 1
block = blocks.RevBlock(
- n_res=3, filters=64, strides=(1, 1), input_shape=input_shape)
+ n_res=3, filters=128, strides=(1, 1), input_shape=input_shape)
y_tr, y_ev = block(x, training=True), block(x, training=False)
self.assertEqual(y_tr.shape, y_ev.shape)
- self.assertEqual(y_ev.shape, (16, 64, 224, 224))
+ self.assertEqual(y_ev.shape, (16, 128, 8, 8))
self.assertNotAllClose(y_tr, y_ev)
# Stride of 2
block = blocks.RevBlock(
- n_res=3, filters=64, strides=(2, 2), input_shape=input_shape)
+ n_res=3, filters=128, strides=(2, 2), input_shape=input_shape)
y_tr, y_ev = block(x, training=True), block(x, training=False)
self.assertEqual(y_tr.shape, y_ev.shape)
- self.assertEqual(y_ev.shape, [16, 64, 112, 112])
+ self.assertEqual(y_ev.shape, [16, 128, 4, 4])
self.assertNotAllClose(y_tr, y_ev)
def test_call_channels_last(self):
"""Test `call` function with `channels_last` data format."""
with tf.device("/cpu:0"): # NHWC format
- input_shape = (224, 224, 32)
+ input_shape = (8, 8, 128)
data_shape = (16,) + input_shape
x = tf.random_normal(shape=data_shape)
# Stride 1
block = blocks.RevBlock(
n_res=3,
- filters=64,
+ filters=128,
strides=(1, 1),
input_shape=input_shape,
data_format="channels_last")
y_tr, y_ev = block(x, training=True), block(x, training=False)
self.assertEqual(y_tr.shape, y_ev.shape)
- self.assertEqual(y_ev.shape, (16, 224, 224, 64))
+ self.assertEqual(y_ev.shape, (16, 8, 8, 128))
self.assertNotAllClose(y_tr, y_ev)
# Stride of 2
block = blocks.RevBlock(
n_res=3,
- filters=64,
+ filters=128,
strides=(2, 2),
input_shape=input_shape,
data_format="channels_last")
y_tr, y_ev = block(x, training=True), block(x, training=False)
self.assertEqual(y_tr.shape, y_ev.shape)
- self.assertEqual(y_ev.shape, (16, 112, 112, 64))
+ self.assertEqual(y_ev.shape, (16, 4, 4, 128))
self.assertNotAllClose(y_tr, y_ev)
+ def _check_grad_angle(self, grads, grads_true, atol=1e0):
+ """Check the angle between two list of vectors are all close."""
+ for g1, g2 in zip(grads, grads_true):
+ degree = compute_degree(g1, g2)
+ self.assertLessEqual(degree, atol)
+
def test_backward_grads_and_vars_channels_first(self):
"""Test `backward` function with `channels_first` data format."""
if not tf.test.is_gpu_available():
self.skipTest("GPU not available")
with tf.device("/gpu:0"): # Default NCHW format
- input_shape = (32, 224, 224)
- data_shape = (16,) + input_shape
- x = tf.random_normal(shape=data_shape)
-
# Stride 1
- y = tf.random_normal(shape=data_shape)
- dy = tf.random_normal(shape=data_shape)
- block = blocks.RevBlock(
- n_res=3, filters=32, strides=(1, 1), input_shape=input_shape)
- dy, grads, vars_ = block.backward_grads_and_vars(x, y, dy)
- self.assertEqual(dy.shape, x.shape)
- self.assertTrue(isinstance(grads, list))
- self.assertTrue(isinstance(vars_, list))
-
- # Stride 2
- y = tf.random_normal(shape=(16, 32, 112, 112))
- dy = tf.random_normal(shape=(16, 32, 112, 112))
- block = blocks.RevBlock(
- n_res=3, filters=32, strides=(2, 2), input_shape=input_shape)
- dy, grads, vars_ = block.backward_grads_and_vars(x, y, dy)
- self.assertEqual(dy.shape, x.shape)
- self.assertTrue(isinstance(grads, list))
- self.assertTrue(isinstance(vars_, list))
-
- def test_backward_grads_and_vars_channels_last(self):
- """Test `backward` function with `channels_last` data format."""
- with tf.device("/cpu:0"): # NHWC format
- input_shape = (224, 224, 32)
+ input_shape = (128, 8, 8)
data_shape = (16,) + input_shape
- x = tf.random_normal(shape=data_shape)
-
- # Stride 1
- y = tf.random_normal(shape=data_shape)
- dy = tf.random_normal(shape=data_shape)
+ x = tf.random_normal(shape=data_shape, dtype=tf.float64)
+ dy = tf.random_normal(shape=data_shape, dtype=tf.float64)
block = blocks.RevBlock(
n_res=3,
- filters=32,
+ filters=128,
strides=(1, 1),
input_shape=input_shape,
- data_format="channels_last")
- dy, grads, vars_ = block.backward_grads_and_vars(x, y, dy)
- self.assertEqual(dy.shape, x.shape)
- self.assertTrue(isinstance(grads, list))
- self.assertTrue(isinstance(vars_, list))
+ fused=False,
+ dtype=tf.float64)
+ with tf.GradientTape() as tape:
+ tape.watch(x)
+ y = block(x, training=True)
+ # Compute grads from reconstruction
+ dx, dw, vars_ = block.backward_grads_and_vars(x, y, dy, training=True)
+ # Compute true grads
+ grads = tape.gradient(y, [x] + vars_, output_gradients=dy)
+ dx_true, dw_true = grads[0], grads[1:]
+ self.assertAllClose(dx_true, dx)
+ self.assertAllClose(dw_true, dw)
+ self._check_grad_angle(dx_true, dx)
+ self._check_grad_angle(dw_true, dw)
# Stride 2
- y = tf.random_normal(shape=(16, 112, 112, 32))
- dy = tf.random_normal(shape=(16, 112, 112, 32))
+ x = tf.random_normal(shape=data_shape, dtype=tf.float64)
+ dy = tf.random_normal(shape=(16, 128, 4, 4), dtype=tf.float64)
block = blocks.RevBlock(
n_res=3,
- filters=32,
+ filters=128,
strides=(2, 2),
input_shape=input_shape,
- data_format="channels_last")
- dy, grads, vars_ = block.backward_grads_and_vars(x, y, dy)
- self.assertEqual(dy.shape, x.shape)
- self.assertTrue(isinstance(grads, list))
- self.assertTrue(isinstance(vars_, list))
+ fused=False,
+ dtype=tf.float64)
+ with tf.GradientTape() as tape:
+ tape.watch(x)
+ y = block(x, training=True)
+ # Compute grads from reconstruction
+ dx, dw, vars_ = block.backward_grads_and_vars(x, y, dy, training=True)
+ # Compute true grads
+ grads = tape.gradient(y, [x] + vars_, output_gradients=dy)
+ dx_true, dw_true = grads[0], grads[1:]
+ self.assertAllClose(dx_true, dx)
+ self.assertAllClose(dw_true, dw)
+ self._check_grad_angle(dx_true, dx)
+ self._check_grad_angle(dw_true, dw)
class _ResidualTest(tf.test.TestCase):
@@ -229,112 +245,40 @@ class _ResidualTest(tf.test.TestCase):
_validate_block_call_channels_first(blocks._Residual, self)
_validate_block_call_channels_last(blocks._Residual, self)
- def test_backward_channels_first(self):
- """Test `backward` function with `channels_first` data format."""
- if not tf.test.is_gpu_available():
- self.skipTest("GPU not available")
-
- with tf.device("/gpu:0"): # Default NCHW format
- input_shape = (16, 224, 224)
- data_shape = (16,) + input_shape
- x = tf.random_normal(shape=data_shape)
- residual = blocks._Residual(
- filters=16, strides=(1, 1), input_shape=input_shape)
-
- y_tr, y_ev = residual(x, training=True), residual(x, training=False)
- x_ = residual.backward(y_ev, training=False)
- self.assertAllClose(x, x_, rtol=1e-1, atol=1e-1)
- x_ = residual.backward(y_tr, training=True) # This updates moving avg
- self.assertAllClose(x, x_, rtol=1e-1, atol=1e-1)
-
- def test_backward_channels_last(self):
- """Test `backward` function with `channels_last` data format."""
- with tf.device("/cpu:0"): # NHWC format
- input_shape = (224, 224, 16)
- data_shape = (16,) + input_shape
- x = tf.random_normal(shape=data_shape)
- residual = blocks._Residual(
- filters=16,
- strides=(1, 1),
- input_shape=input_shape,
- data_format="channels_last")
-
- y_tr, y_ev = residual(x, training=True), residual(x, training=False)
- x_ = residual.backward(y_ev, training=False)
- self.assertAllClose(x, x_, rtol=1e-1, atol=1e-1)
- x_ = residual.backward(y_tr, training=True) # This updates moving avg
- self.assertAllClose(x, x_, rtol=1e-1, atol=1e-1)
-
def test_backward_grads_and_vars_channels_first(self):
"""Test `backward_grads` function with `channels_first` data format."""
if not tf.test.is_gpu_available():
self.skipTest("GPU not available")
with tf.device("/gpu:0"): # Default NCHW format
- input_shape = (16, 224, 224)
+ input_shape = (128, 8, 8)
data_shape = (16,) + input_shape
- x = tf.random_normal(shape=data_shape)
- dy = tf.random_normal(shape=data_shape)
+ # Use double precision for testing
+ x_true = tf.random_normal(shape=data_shape, dtype=tf.float64)
+ dy = tf.random_normal(shape=data_shape, dtype=tf.float64)
residual = blocks._Residual(
- filters=16, strides=(1, 1), input_shape=input_shape)
-
- vars_and_vals = residual.get_moving_stats()
- dx_tr, grads_tr, vars_tr = residual.backward_grads_and_vars(
- x, dy=dy, training=True)
- dx_ev, grads_ev, vars_ev = residual.backward_grads_and_vars(
- x, dy=dy, training=False)
- self.assertNotAllClose(dx_tr, dx_ev)
- self.assertTrue(isinstance(grads_tr, list))
- self.assertTrue(isinstance(grads_ev, list))
- self.assertTrue(isinstance(vars_tr, list))
- self.assertTrue(isinstance(vars_ev, list))
- for grad_tr, var_tr, grad_ev, var_ev in zip(grads_tr, vars_tr, grads_ev,
- vars_ev):
- self.assertEqual(grad_tr.shape, grad_ev.shape)
- self.assertEqual(var_tr.shape, var_ev.shape)
- self.assertEqual(grad_tr.shape, var_tr.shape)
-
- # Compare against the true gradient computed by the tape
- residual.restore_moving_stats(vars_and_vals)
- with tf.GradientTape(persistent=True) as tape:
- tape.watch(x)
- y = residual(x, training=True)
- grads = tape.gradient(
- y, [x] + residual.trainable_variables, output_gradients=[dy])
- dx_tr_true, grads_tr_true = grads[0], grads[1:]
+ filters=128,
+ strides=(1, 1),
+ input_shape=input_shape,
+ fused=False,
+ dtype=tf.float64)
- del tape
+ with tf.GradientTape() as tape:
+ x_true = tf.identity(x_true)
+ tape.watch(x_true)
+ y = residual(x_true, training=True)
- self.assertAllClose(dx_tr, dx_tr_true, rtol=1e-1, atol=1e-1)
- self.assertAllClose(grads_tr, grads_tr_true, rtol=1e-1, atol=1e-1)
+ # Gradients computed due to reversibility
+ x, dx, dw, vars_ = residual.backward_grads_and_vars(
+ y, dy=dy, training=True)
- def test_backward_grads_and_vars_channels_last(self):
- """Test `backward_grads` function with `channels_last` data format."""
- with tf.device("/cpu:0"): # NHWC format
- input_shape = (224, 224, 16)
- data_shape = (16,) + input_shape
- x = tf.random_normal(shape=data_shape)
- dy = tf.random_normal(shape=data_shape)
- residual = blocks._Residual(
- filters=16,
- strides=(1, 1),
- input_shape=input_shape,
- data_format="channels_last")
+ # True gradients computed by the tape
+ grads = tape.gradient(y, [x_true] + vars_, output_gradients=dy)
+ dx_true, dw_true = grads[0], grads[1:]
- dx_tr, grads_tr, vars_tr = residual.backward_grads_and_vars(
- x, dy=dy, training=True)
- dx_ev, grads_ev, vars_ev = residual.backward_grads_and_vars(
- x, dy=dy, training=False)
- self.assertNotAllClose(dx_tr, dx_ev)
- self.assertTrue(isinstance(grads_tr, list))
- self.assertTrue(isinstance(grads_ev, list))
- self.assertTrue(isinstance(vars_tr, list))
- self.assertTrue(isinstance(vars_ev, list))
- for grad_tr, var_tr, grad_ev, var_ev in zip(grads_tr, vars_tr, grads_ev,
- vars_ev):
- self.assertEqual(grad_tr.shape, grad_ev.shape)
- self.assertEqual(var_tr.shape, var_ev.shape)
- self.assertEqual(grad_tr.shape, var_tr.shape)
+ self.assertAllClose(x_true, x)
+ self.assertAllClose(dx_true, dx)
+ self.assertAllClose(dw_true, dw)
class _ResidualInnerTest(tf.test.TestCase):