"""Functional tests for aggregate operations.""" import tensorflow.python.platform import numpy as np import tensorflow as tf class AdagradOptimizerTest(tf.test.TestCase): def testBasic(self): with self.test_session(): var0 = tf.Variable([1.0, 2.0]) var1 = tf.Variable([3.0, 4.0]) grads0 = tf.constant([0.1, 0.1]) grads1 = tf.constant([0.01, 0.01]) ada_opt = tf.train.AdagradOptimizer(3.0, initial_accumulator_value=0.1) ada_update = ada_opt.apply_gradients(zip([grads0, grads1], [var0, var1])) tf.initialize_all_variables().run() # Fetch params to validate initial values self.assertAllClose([1.0, 2.0], var0.eval()) self.assertAllClose([3.0, 4.0], var1.eval()) # Run 3 steps of adagrad for _ in range(3): ada_update.run() # Validate updated params self.assertAllClose(np.array([-1.6026098728179932, -0.6026098728179932]), var0.eval()) self.assertAllClose(np.array([2.715679168701172, 3.715679168701172]), var1.eval()) def testFloat64(self): with self.test_session(): opt = tf.train.AdagradOptimizer(3.0, initial_accumulator_value=0.1) # compute_gradients. values = [1.0, 3.0] good_vars = [tf.Variable([v]) for v in values] bad_loss = tf.constant(2.0, tf.float64, name="bad_loss") self.assertRaisesRegexp( ValueError, r"Invalid type.*float64.*bad_loss.*expected.*float32", opt.compute_gradients, bad_loss, good_vars) bad_vars = [ tf.Variable(np.array([v], np.float64), name="bad_var") for v in values] self.assertRaisesRegexp( ValueError, r"Invalid type.*float64.*bad_var.*expected.*float32", opt.compute_gradients, tf.cast(bad_vars[0] + bad_vars[1], tf.float32), bad_vars) opt.compute_gradients(good_vars[0] + good_vars[1], good_vars) # apply_gradients. bad_grads = [ tf.constant([0.1], dtype=np.float64, name="bad_grad"), tf.constant([0.01])] self.assertRaisesRegexp( ValueError, r"Invalid type.*float64.*bad_grad.*expected.*float32", opt.apply_gradients, zip(bad_grads, good_vars)) good_grads = [tf.constant([0.01]), tf.constant([0.02])] self.assertRaisesRegexp( ValueError, r"Invalid type.*float64.*bad_var.*expected.*float32", opt.apply_gradients, zip(good_grads, bad_vars)) opt.apply_gradients(zip(good_grads, good_vars)) def testSparseBasic(self): with self.test_session(): var0 = tf.Variable([[1.0], [2.0]]) var1 = tf.Variable([[3.0], [4.0]]) grads0 = tf.IndexedSlices(tf.constant([0.1], shape=[1, 1]), tf.constant([0]), tf.constant([2, 1])) grads1 = tf.IndexedSlices(tf.constant([0.01], shape=[1, 1]), tf.constant([1]), tf.constant([2, 1])) ada_opt = tf.train.AdagradOptimizer(3.0, initial_accumulator_value=0.1) ada_update = ada_opt.apply_gradients(zip([grads0, grads1], [var0, var1])) tf.initialize_all_variables().run() # Fetch params to validate initial values self.assertAllClose([[1.0], [2.0]], var0.eval()) self.assertAllClose([[3.0], [4.0]], var1.eval()) # Run 3 step of sgd for _ in range(3): ada_update.run() # Validate updated params self.assertAllClose([[-1.6026098728179932], [2.0]], var0.eval()) self.assertAllClose([[3.0], [3.715679168701172]], var1.eval()) def testSparseStability(self): with self.test_session(): shape = [1, 6] var0 = tf.Variable([[0.00872496, -0.106952, 0.110467, 0.226505, -0.0147257, -0.0105945]]) grads0 = tf.IndexedSlices( tf.constant( [[-5.91278e-05, 5.31673e-05, -2.5779e-06, 4.29153e-05, -8.4877e-05, -9.48906e-05]], shape=shape), tf.constant([0]), tf.constant(shape)) ada_opt = tf.train.AdagradOptimizer(1.0, initial_accumulator_value=0.1) ada_update = ada_opt.apply_gradients(zip([grads0], [var0])) self.assertEqual(["accumulator"], ada_opt.get_slot_names()) slot0 = ada_opt.get_slot(var0, "accumulator") init = tf.initialize_all_variables() for _ in range(100): init.run() ada_update.run() self.assertAllClose([[0.1, 0.1, 0.1, 0.1, 0.1, 0.1]], slot0.eval()) self.assertAllClose( [[0.00891194, -0.10712013, 0.11047515, 0.22636929, -0.0144573, -0.01029443]], var0.eval()) def testSharing(self): with self.test_session(): var0 = tf.Variable([1.0, 2.0]) var1 = tf.Variable([3.0, 4.0]) grads0 = tf.constant([0.1, 0.1]) grads1 = tf.constant([0.01, 0.01]) ada_opt = tf.train.AdagradOptimizer(3.0) # Apply the optimizer twice. Both applications will use the same accums. ada_update1 = ada_opt.apply_gradients(zip([grads0, grads1], [var0, var1])) ada_update2 = ada_opt.apply_gradients(zip([grads0, grads1], [var0, var1])) self.assertEqual(["accumulator"], ada_opt.get_slot_names()) slot0 = ada_opt.get_slot(var0, "accumulator") self.assertEquals(slot0.get_shape(), var0.get_shape()) slot1 = ada_opt.get_slot(var1, "accumulator") self.assertEquals(slot1.get_shape(), var1.get_shape()) tf.initialize_all_variables().run() # Fetch params to validate initial values. self.assertAllClose([1.0, 2.0], var0.eval()) self.assertAllClose([3.0, 4.0], var1.eval()) # Mix the first and the second adagrad for 3 steps. ada_update1.run() ada_update2.run() ada_update1.run() # Validate updated params (the same as with only 1 Adagrad). self.assertAllClose(np.array([-1.6026098728179932, -0.6026098728179932]), var0.eval()) self.assertAllClose(np.array([2.715679168701172, 3.715679168701172]), var1.eval()) if __name__ == "__main__": tf.test.main()