diff options
author | 2017-09-27 11:26:16 -0700 | |
---|---|---|
committer | 2017-09-27 11:33:43 -0700 | |
commit | bced6676e260630c710345a21c280fda659100f6 (patch) | |
tree | 04026e5dd72709a0b063837c8605d94f16d076ed /tensorflow/contrib/factorization | |
parent | 970bdcc47a0085b4913232dd2eec87dc0d82f61e (diff) |
Automated g4 rollback of changelist 170204652
PiperOrigin-RevId: 170226583
Diffstat (limited to 'tensorflow/contrib/factorization')
-rw-r--r-- | tensorflow/contrib/factorization/BUILD | 3 | ||||
-rw-r--r-- | tensorflow/contrib/factorization/python/ops/factorization_ops_test.py | 382 |
2 files changed, 17 insertions, 368 deletions
diff --git a/tensorflow/contrib/factorization/BUILD b/tensorflow/contrib/factorization/BUILD index 214c4245cc..c468c544d3 100644 --- a/tensorflow/contrib/factorization/BUILD +++ b/tensorflow/contrib/factorization/BUILD @@ -195,9 +195,6 @@ tf_py_test( "//tensorflow/python:framework_for_generated_wrappers", "//tensorflow/python:framework_test_lib", "//tensorflow/python:platform_test", - "//tensorflow/python:sparse_ops", - "//tensorflow/python:training", - "//tensorflow/python:variables", ], ) diff --git a/tensorflow/contrib/factorization/python/ops/factorization_ops_test.py b/tensorflow/contrib/factorization/python/ops/factorization_ops_test.py index 1121d04f76..c813733915 100644 --- a/tensorflow/contrib/factorization/python/ops/factorization_ops_test.py +++ b/tensorflow/contrib/factorization/python/ops/factorization_ops_test.py @@ -18,8 +18,6 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import contextlib -import itertools import numpy as np from six.moves import xrange # pylint: disable=redefined-builtin @@ -31,18 +29,13 @@ from tensorflow.python.framework import sparse_tensor from tensorflow.python.ops import array_ops from tensorflow.python.ops import embedding_ops from tensorflow.python.ops import math_ops -from tensorflow.python.ops import sparse_ops -from tensorflow.python.ops import variables from tensorflow.python.platform import test -from tensorflow.python.training import coordinator -from tensorflow.python.training import input as input_lib -from tensorflow.python.training import queue_runner INPUT_MATRIX = factorization_ops_test_utils.INPUT_MATRIX np_matrix_to_tf_sparse = factorization_ops_test_utils.np_matrix_to_tf_sparse -class WALSModelTest(test.TestCase): +class WalsModelTest(test.TestCase): def sparse_input(self): return np_matrix_to_tf_sparse(INPUT_MATRIX) @@ -554,8 +547,10 @@ class WALSModelTest(test.TestCase): for r1, r2 in zip(row_factors1, row_factors2): self.assertAllClose(r1, r2, atol=1e-3) - rows = list(itertools.chain(*row_factors2)) - self.assertAllClose(als_projected_row_factors1, rows, atol=1e-3) + self.assertAllClose( + als_projected_row_factors1, + [row for shard in row_factors2 for row in shard], + atol=1e-3) # Here we test partial column updates. sp_c = np_matrix_to_tf_sparse( @@ -679,12 +674,9 @@ class WALSModelTest(test.TestCase): cols = 11 dims = 3 with ops.Graph().as_default(), self.test_session(): - data = np.dot(np.random.rand(rows, 3), np.random.rand(3, cols)).astype( - np.float32) / 3.0 - indices = [] - for i in xrange(rows): - for j in xrange(cols): - indices.append([i, j]) + data = np.dot(np.random.rand(rows, 3), np.random.rand( + 3, cols)).astype(np.float32) / 3.0 + indices = [[i, j] for i in xrange(rows) for j in xrange(cols)] values = data.reshape(-1) inp = sparse_tensor.SparseTensor(indices, values, [rows, cols]) model = factorization_ops.WALSModel( @@ -712,12 +704,9 @@ class WALSModelTest(test.TestCase): dims = 3 with ops.Graph().as_default(), self.test_session(): - data = np.dot(np.random.rand(rows, 3), np.random.rand(3, cols)).astype( - np.float32) / 3.0 - indices = [] - for i in xrange(rows): - for j in xrange(cols): - indices.append([i, j]) + data = np.dot(np.random.rand(rows, 3), np.random.rand( + 3, cols)).astype(np.float32) / 3.0 + indices = [[i, j] for i in xrange(rows) for j in xrange(cols)] values = data.reshape(-1) inp = sparse_tensor.SparseTensor(indices, values, [rows, cols]) model = factorization_ops.WALSModel( @@ -750,13 +739,12 @@ class WALSModelTest(test.TestCase): with ops.Graph().as_default(), self.test_session(): row_wts = 0.1 + np.random.rand(rows) col_wts = 0.1 + np.random.rand(cols) - data = np.dot(np.random.rand(rows, 3), np.random.rand(3, cols)).astype( - np.float32) / 3.0 - all_indices = [] - for i in xrange(rows): - for j in xrange(cols): - all_indices.append([i, j]) - indices = np.array(filter(keep_index, all_indices)) + data = np.dot(np.random.rand(rows, 3), np.random.rand( + 3, cols)).astype(np.float32) / 3.0 + indices = np.array( + list( + filter(keep_index, + [[i, j] for i in xrange(rows) for j in xrange(cols)]))) values = data[indices[:, 0], indices[:, 1]] inp = sparse_tensor.SparseTensor(indices, values, [rows, cols]) model = factorization_ops.WALSModel( @@ -835,341 +823,5 @@ class WALSModelTest(test.TestCase): self._run_test_sum_weights(False) -def _batch(sparse_matrix, num_rows, batch_size): - """Returns a SparseTensor containing a batch of rows from an input matrix.""" - # Create batch of matrix elements and corresponding row indices. - row_ids = math_ops.range(num_rows, dtype=dtypes.int64) - sparse_batch, row_ids_batch = input_lib.batch( - [sparse_matrix, row_ids], - batch_size=min(batch_size, num_rows), - capacity=10, - enqueue_many=True) - - # Remap the row indices and return the resulting SparseTensor. - old_row_ids, old_col_ids = array_ops.split( - value=sparse_batch.indices, num_or_size_splits=2, axis=1) - new_row_ids = array_ops.gather(row_ids_batch, old_row_ids) - new_indices = array_ops.concat([new_row_ids, old_col_ids], 1) - return sparse_ops.sparse_reorder( - sparse_tensor.SparseTensor( - indices=new_indices, - values=sparse_batch.values, - dense_shape=sparse_matrix.dense_shape)) - - -class WALSModelFactorizationTest(test.TestCase): - """Tests that execute an entire factorization sequence.""" - - def _setup_scenario(self, row_batch_size, col_batch_size): - """Set up a common scenario for factoring `INPUT_MATRIX`. - - This is for tests that factor `INPUT_MATRIX`, split into two row partitions - and three column partitions. It initializes the row and column factors to - fixed (not random) values. - - Args: - row_batch_size: Update this many rows at a time. - col_batch_size: Update this many columns at a time. - """ - # The initial factors. - self._row_factors_0 = [ - [ - [2., 2., 2.], - [2., 2., 2.], - [2., 2., 2.], - ], - [ - [2., 2., 2.], - [2., 2., 2.], - ], - ] - self._col_factors_0 = [ - [ - [1., 1., 1.], - [1., 1., 1.], - [1., 1., 1.], - ], - [ - [1., 1., 1.], - [1., 1., 1.], - ], - [ - [1., 1., 1.], - [1., 1., 1.], - ], - ] - - # The factors and total loss after a single row/col sweep. - self._row_factors_1 = [ - [ - [0.093546, 0.093553, 0.093553], - [0.420985, 0.420975, 0.420975], - [0.673242, 0.67328, 0.67328], - ], - [ - [1.013467, 1.013465, 1.013465], - [1.297011, 1.297039, 1.297039], - ], - ] - self._row_loss_1 = 13.124323844909668 - self._col_factors_1 = [ - [ - [0.882218, 0.882083, 0.882104], - [0.964144, 0.964672, 0.964648], - [0.871497, 0.869866, 0.869855], - ], - [ - [0.999492, 0.999434, 0.999458], - [1.052393, 1.052634, 1.052561], - ], - [ - [1.058472, 1.059054, 1.05908], - [1.107913, 1.107737, 1.107763], - ], - ] - self._col_loss_1 = 12.321547508239746 - - # The factors and total loss after a second row/col sweep. - self._row_factors_2 = [ - [ - [0.08223, 0.108721, 0.108142], - [0.412234, 0.41563, 0.415546], - [0.660805, 0.694732, 0.698372], - ], - [ - [1.109942, 1.01535, 1.018449], - [1.224644, 1.290318, 1.284723], - ], - ] - self._row_loss_2 = 12.234291076660156 - self._col_factors_2 = [ - [ - [2.689738, -0.26665, 0.107037], - [-1.746963, 2.472947, 2.107421], - [4.877673, -1.40563, -1.174043], - ], - [ - [2.394881, 0.058395, 0.448117], - [-1.754005, 2.605651, 2.243201], - ], - [ - [2.215456, 0.21321, 0.645511], - [-1.632659, 2.630967, 2.271138], - ], - ] - self._col_loss_2 = 11.303979873657227 - - num_rows = np.shape(INPUT_MATRIX)[0] - num_cols = np.shape(INPUT_MATRIX)[1] - - self._model = factorization_ops.WALSModel( - input_rows=num_rows, - input_cols=num_cols, - n_components=3, - unobserved_weight=0.1, - regularization=0.01, - row_init=self._row_factors_0, - col_init=self._col_factors_0, - num_row_shards=2, - num_col_shards=3, - row_weights=1., - col_weights=1., - use_factors_weights_cache=False) - - row_batch_items = _batch( - sparse_matrix=np_matrix_to_tf_sparse(INPUT_MATRIX), - num_rows=num_rows, - batch_size=row_batch_size) - col_batch_items = _batch( - sparse_matrix=np_matrix_to_tf_sparse(np.transpose(INPUT_MATRIX)), - num_rows=num_cols, - batch_size=col_batch_size) - - (_, self._row_update_op, row_unregularized_loss, row_regularization, - _) = self._model.update_row_factors(row_batch_items) - self._row_loss = row_unregularized_loss + row_regularization - (_, self._col_update_op, col_unregularized_loss, col_regularization, - _) = self._model.update_col_factors( - col_batch_items, transpose_input=True) - self._col_loss = col_unregularized_loss + col_regularization - - @contextlib.contextmanager - def _initiate_session(self): - """Manages a test session with queue-runner threads.""" - with self.test_session() as sess: - coord = coordinator.Coordinator() - threads = queue_runner.start_queue_runners(sess=sess, coord=coord) - yield sess - coord.request_stop() - coord.join(threads) - - def _initialize_model(self, sess): - """Runs initialization ops and tests the initial weights and factors.""" - sess.run(variables.global_variables_initializer()) - sess.run(self._model.initialize_op) - sess.run(self._model.worker_init) - self.assertAllPartitionsClose(sess, [ - [1., 1., 1.], - [1., 1.], - ], self._model.row_weights) - self.assertAllPartitionsClose(sess, [ - [1., 1., 1.], - [1., 1.], - [1., 1.], - ], self._model.col_weights) - self.assertAllPartitionsClose(sess, self._row_factors_0, - self._model.row_factors) - self.assertAllPartitionsClose(sess, self._col_factors_0, - self._model.col_factors) - - def _sweep(self, sess, init_ops, update_op, num_batches, expected_row_factors, - expected_col_factors): - """Runs a complete solving sweep (rows or cols) and tests the factors.""" - # Initialize row update. - for op in init_ops: - sess.run(op) - # Row or col update, done after `num_batches` batches. - for _ in xrange(num_batches): - sess.run(update_op) - self.assertAllPartitionsClose(sess, expected_row_factors, - self._model.row_factors) - self.assertAllPartitionsClose(sess, expected_col_factors, - self._model.col_factors) - # Test that the solve is idempotent. - sess.run(update_op) - self.assertAllPartitionsClose(sess, expected_row_factors, - self._model.row_factors) - self.assertAllPartitionsClose(sess, expected_col_factors, - self._model.col_factors) - - def assertAllPartitionsClose(self, sess, expected_partitions, got_partitions): - """Compares two lists of tensors.""" - self.assertAllClose( - dict(enumerate(expected_partitions)), - dict(enumerate(sess.run(got_partitions)))) - - def testBatched(self): - """Tests a scenario with row/col input split into batches. - - It is not too meaningful to test loss values in this scenario because - they are reported per batch, and how the input is broken up into batches - (including rollover) is determined by an underspecified external - component (the queue runner). - """ - self._setup_scenario(row_batch_size=4, col_batch_size=5) - - with self._initiate_session() as sess: - self._initialize_model(sess) - - # Row update. - self._sweep( - sess=sess, - init_ops=[ - self._model.row_update_prep_gramian_op, - self._model.initialize_row_update_op - ], - update_op=self._row_update_op, - num_batches=2, - expected_row_factors=self._row_factors_1, - expected_col_factors=self._col_factors_0) - - # Col update. - self._sweep( - sess=sess, - init_ops=[ - self._model.col_update_prep_gramian_op, - self._model.initialize_col_update_op - ], - update_op=self._col_update_op, - num_batches=2, - expected_row_factors=self._row_factors_1, - expected_col_factors=self._col_factors_1) - - # Row update. - self._sweep( - sess=sess, - init_ops=[ - self._model.row_update_prep_gramian_op, - self._model.initialize_row_update_op - ], - update_op=self._row_update_op, - num_batches=2, - expected_row_factors=self._row_factors_2, - expected_col_factors=self._col_factors_1) - - # Col update. - self._sweep( - sess=sess, - init_ops=[ - self._model.col_update_prep_gramian_op, - self._model.initialize_col_update_op - ], - update_op=self._col_update_op, - num_batches=2, - expected_row_factors=self._row_factors_2, - expected_col_factors=self._col_factors_2) - - def testFullBatch(self): - """Tests a scenario with all rows/cols processed in a single batch.""" - self._setup_scenario( - row_batch_size=np.shape(INPUT_MATRIX)[0], - col_batch_size=np.shape(INPUT_MATRIX)[1]) - - with self._initiate_session() as sess: - self._initialize_model(sess) - - # Row update. - self._sweep( - sess=sess, - init_ops=[ - self._model.row_update_prep_gramian_op, - self._model.initialize_row_update_op - ], - update_op=self._row_update_op, - num_batches=1, - expected_row_factors=self._row_factors_1, - expected_col_factors=self._col_factors_0) - self.assertAllClose(self._row_loss_1, sess.run(self._row_loss)) - - # Col update. - self._sweep( - sess=sess, - init_ops=[ - self._model.col_update_prep_gramian_op, - self._model.initialize_col_update_op - ], - update_op=self._col_update_op, - num_batches=1, - expected_row_factors=self._row_factors_1, - expected_col_factors=self._col_factors_1) - self.assertAllClose(self._col_loss_1, sess.run(self._col_loss)) - - # Row update. - self._sweep( - sess=sess, - init_ops=[ - self._model.row_update_prep_gramian_op, - self._model.initialize_row_update_op - ], - update_op=self._row_update_op, - num_batches=1, - expected_row_factors=self._row_factors_2, - expected_col_factors=self._col_factors_1) - self.assertAllClose(self._row_loss_2, sess.run(self._row_loss)) - - # Col update. - self._sweep( - sess=sess, - init_ops=[ - self._model.col_update_prep_gramian_op, - self._model.initialize_col_update_op - ], - update_op=self._col_update_op, - num_batches=1, - expected_row_factors=self._row_factors_2, - expected_col_factors=self._col_factors_2) - self.assertAllClose(self._col_loss_2, sess.run(self._col_loss)) - - if __name__ == "__main__": test.main() |