# Copyright 2016 The TensorFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """Tests for Bigtable Ops.""" from __future__ import absolute_import from __future__ import division from __future__ import print_function from tensorflow.contrib import bigtable from tensorflow.contrib.bigtable.ops import gen_bigtable_ops from tensorflow.contrib.bigtable.ops import gen_bigtable_test_ops from tensorflow.contrib.bigtable.python.ops import bigtable_api from tensorflow.contrib.util import loader from tensorflow.python.data.ops import dataset_ops from tensorflow.python.framework import errors from tensorflow.python.platform import resource_loader from tensorflow.python.platform import test from tensorflow.python.util import compat _bigtable_so = loader.load_op_library( resource_loader.get_path_to_datafile("_bigtable_test.so")) def _ListOfTuplesOfStringsToBytes(values): return [(compat.as_bytes(i[0]), compat.as_bytes(i[1])) for i in values] class BigtableOpsTest(test.TestCase): COMMON_ROW_KEYS = ["r1", "r2", "r3"] COMMON_VALUES = ["v1", "v2", "v3"] def setUp(self): self._client = gen_bigtable_test_ops.bigtable_test_client() table = gen_bigtable_ops.bigtable_table(self._client, "testtable") self._table = bigtable.BigtableTable("testtable", None, table) def _makeSimpleDataset(self): output_rows = dataset_ops.Dataset.from_tensor_slices(self.COMMON_ROW_KEYS) output_values = dataset_ops.Dataset.from_tensor_slices(self.COMMON_VALUES) return dataset_ops.Dataset.zip((output_rows, output_values)) def _writeCommonValues(self, sess): output_ds = self._makeSimpleDataset() write_op = self._table.write(output_ds, ["cf1"], ["c1"]) sess.run(write_op) def runReadKeyTest(self, read_ds): itr = read_ds.make_initializable_iterator() n = itr.get_next() expected = list(self.COMMON_ROW_KEYS) expected.reverse() with self.test_session() as sess: self._writeCommonValues(sess) sess.run(itr.initializer) for i in range(3): output = sess.run(n) want = expected.pop() self.assertEqual( compat.as_bytes(want), compat.as_bytes(output), "Unequal at step %d: want: %s, got: %s" % (i, want, output)) def testReadPrefixKeys(self): self.runReadKeyTest(self._table.keys_by_prefix_dataset("r")) def testReadRangeKeys(self): self.runReadKeyTest(self._table.keys_by_range_dataset("r1", "r4")) def runScanTest(self, read_ds): itr = read_ds.make_initializable_iterator() n = itr.get_next() expected_keys = list(self.COMMON_ROW_KEYS) expected_keys.reverse() expected_values = list(self.COMMON_VALUES) expected_values.reverse() with self.test_session() as sess: self._writeCommonValues(sess) sess.run(itr.initializer) for i in range(3): output = sess.run(n) want = expected_keys.pop() self.assertEqual( compat.as_bytes(want), compat.as_bytes(output[0]), "Unequal keys at step %d: want: %s, got: %s" % (i, want, output[0])) want = expected_values.pop() self.assertEqual( compat.as_bytes(want), compat.as_bytes(output[1]), "Unequal values at step: %d: want: %s, got: %s" % (i, want, output[1])) def testScanPrefixStringCol(self): self.runScanTest(self._table.scan_prefix("r", cf1="c1")) def testScanPrefixListCol(self): self.runScanTest(self._table.scan_prefix("r", cf1=["c1"])) def testScanPrefixTupleCol(self): self.runScanTest(self._table.scan_prefix("r", columns=("cf1", "c1"))) def testScanRangeStringCol(self): self.runScanTest(self._table.scan_range("r1", "r4", cf1="c1")) def testScanRangeListCol(self): self.runScanTest(self._table.scan_range("r1", "r4", cf1=["c1"])) def testScanRangeTupleCol(self): self.runScanTest(self._table.scan_range("r1", "r4", columns=("cf1", "c1"))) def testLookup(self): ds = self._table.keys_by_prefix_dataset("r") ds = ds.apply(self._table.lookup_columns(cf1="c1")) itr = ds.make_initializable_iterator() n = itr.get_next() expected_keys = list(self.COMMON_ROW_KEYS) expected_values = list(self.COMMON_VALUES) expected_tuples = zip(expected_keys, expected_values) with self.test_session() as sess: self._writeCommonValues(sess) sess.run(itr.initializer) for i, elem in enumerate(expected_tuples): output = sess.run(n) self.assertEqual( compat.as_bytes(elem[0]), compat.as_bytes(output[0]), "Unequal keys at step %d: want: %s, got: %s" % (i, compat.as_bytes(elem[0]), compat.as_bytes(output[0]))) self.assertEqual( compat.as_bytes(elem[1]), compat.as_bytes(output[1]), "Unequal values at step %d: want: %s, got: %s" % (i, compat.as_bytes(elem[1]), compat.as_bytes(output[1]))) def testSampleKeys(self): ds = self._table.sample_keys() itr = ds.make_initializable_iterator() n = itr.get_next() expected_key = self.COMMON_ROW_KEYS[0] with self.test_session() as sess: self._writeCommonValues(sess) sess.run(itr.initializer) output = sess.run(n) self.assertEqual( compat.as_bytes(self.COMMON_ROW_KEYS[0]), compat.as_bytes(output), "Unequal keys: want: %s, got: %s" % (compat.as_bytes( self.COMMON_ROW_KEYS[0]), compat.as_bytes(output))) output = sess.run(n) self.assertEqual( compat.as_bytes(self.COMMON_ROW_KEYS[2]), compat.as_bytes(output), "Unequal keys: want: %s, got: %s" % (compat.as_bytes( self.COMMON_ROW_KEYS[2]), compat.as_bytes(output))) with self.assertRaises(errors.OutOfRangeError): sess.run(n) def runSampleKeyPairsTest(self, ds, expected_key_pairs): itr = ds.make_initializable_iterator() n = itr.get_next() with self.test_session() as sess: self._writeCommonValues(sess) sess.run(itr.initializer) for i, elems in enumerate(expected_key_pairs): output = sess.run(n) self.assertEqual( compat.as_bytes(elems[0]), compat.as_bytes(output[0]), "Unequal key pair (first element) at step %d; want: %s, got %s" % (i, compat.as_bytes(elems[0]), compat.as_bytes(output[0]))) self.assertEqual( compat.as_bytes(elems[1]), compat.as_bytes(output[1]), "Unequal key pair (second element) at step %d; want: %s, got %s" % (i, compat.as_bytes(elems[1]), compat.as_bytes(output[1]))) with self.assertRaises(errors.OutOfRangeError): sess.run(n) def testSampleKeyPairsSimplePrefix(self): ds = bigtable_api._BigtableSampleKeyPairsDataset( self._table, prefix="r", start="", end="") expected_key_pairs = [("r", "r1"), ("r1", "r3"), ("r3", "s")] self.runSampleKeyPairsTest(ds, expected_key_pairs) def testSampleKeyPairsSimpleRange(self): ds = bigtable_api._BigtableSampleKeyPairsDataset( self._table, prefix="", start="r1", end="r3") expected_key_pairs = [("r1", "r3")] self.runSampleKeyPairsTest(ds, expected_key_pairs) def testSampleKeyPairsSkipRangePrefix(self): ds = bigtable_api._BigtableSampleKeyPairsDataset( self._table, prefix="r2", start="", end="") expected_key_pairs = [("r2", "r3")] self.runSampleKeyPairsTest(ds, expected_key_pairs) def testSampleKeyPairsSkipRangeRange(self): ds = bigtable_api._BigtableSampleKeyPairsDataset( self._table, prefix="", start="r2", end="r3") expected_key_pairs = [("r2", "r3")] self.runSampleKeyPairsTest(ds, expected_key_pairs) def testSampleKeyPairsOffsetRanges(self): ds = bigtable_api._BigtableSampleKeyPairsDataset( self._table, prefix="", start="r2", end="r4") expected_key_pairs = [("r2", "r3"), ("r3", "r4")] self.runSampleKeyPairsTest(ds, expected_key_pairs) def testSampleKeyPairEverything(self): ds = bigtable_api._BigtableSampleKeyPairsDataset( self._table, prefix="", start="", end="") expected_key_pairs = [("", "r1"), ("r1", "r3"), ("r3", "")] self.runSampleKeyPairsTest(ds, expected_key_pairs) def testSampleKeyPairsPrefixAndStartKey(self): ds = bigtable_api._BigtableSampleKeyPairsDataset( self._table, prefix="r", start="r1", end="") itr = ds.make_initializable_iterator() with self.test_session() as sess: with self.assertRaises(errors.InvalidArgumentError): sess.run(itr.initializer) def testSampleKeyPairsPrefixAndEndKey(self): ds = bigtable_api._BigtableSampleKeyPairsDataset( self._table, prefix="r", start="", end="r3") itr = ds.make_initializable_iterator() with self.test_session() as sess: with self.assertRaises(errors.InvalidArgumentError): sess.run(itr.initializer) def testParallelScanPrefix(self): ds = self._table.parallel_scan_prefix(prefix="r", cf1="c1") itr = ds.make_initializable_iterator() n = itr.get_next() with self.test_session() as sess: self._writeCommonValues(sess) sess.run(itr.initializer) expected_values = list(zip(self.COMMON_ROW_KEYS, self.COMMON_VALUES)) actual_values = [] for _ in range(len(expected_values)): output = sess.run(n) actual_values.append(output) with self.assertRaises(errors.OutOfRangeError): sess.run(n) self.assertItemsEqual( _ListOfTuplesOfStringsToBytes(expected_values), _ListOfTuplesOfStringsToBytes(actual_values)) def testParallelScanRange(self): ds = self._table.parallel_scan_range(start="r1", end="r4", cf1="c1") itr = ds.make_initializable_iterator() n = itr.get_next() with self.test_session() as sess: self._writeCommonValues(sess) sess.run(itr.initializer) expected_values = list(zip(self.COMMON_ROW_KEYS, self.COMMON_VALUES)) actual_values = [] for _ in range(len(expected_values)): output = sess.run(n) actual_values.append(output) with self.assertRaises(errors.OutOfRangeError): sess.run(n) self.assertItemsEqual( _ListOfTuplesOfStringsToBytes(expected_values), _ListOfTuplesOfStringsToBytes(actual_values)) if __name__ == "__main__": test.main()