diff options
-rw-r--r-- | tensorflow/python/framework/test_util.py | 28 | ||||
-rw-r--r-- | tensorflow/python/framework/test_util_test.py | 16 | ||||
-rw-r--r-- | tensorflow/python/summary/event_accumulator.py | 67 | ||||
-rw-r--r-- | tensorflow/python/summary/event_accumulator_test.py | 44 | ||||
-rw-r--r-- | tensorflow/python/summary/impl/reservoir.py | 14 | ||||
-rw-r--r-- | tensorflow/python/summary/impl/reservoir_test.py | 17 | ||||
-rw-r--r-- | tensorflow/python/training/input.py | 13 | ||||
-rw-r--r-- | tensorflow/tensorboard/TAG | 2 |
8 files changed, 175 insertions, 26 deletions
diff --git a/tensorflow/python/framework/test_util.py b/tensorflow/python/framework/test_util.py index 0f4eb744f1..df08d5d5f0 100644 --- a/tensorflow/python/framework/test_util.py +++ b/tensorflow/python/framework/test_util.py @@ -44,6 +44,34 @@ from tensorflow.python.platform import logging from tensorflow.python.util.protobuf import compare +def assert_ops_in_graph(expected_ops, graph): + """Assert all expected operations are found. + + Args: + expected_ops: `dict<string, string>` of op name to op type. + graph: Graph to check. + Returns: + `dict<string, node>` of node name to node. + + Raises: + ValueError: If the expected ops are not present in the graph. + """ + actual_ops = {} + gd = graph.as_graph_def() + for node in gd.node: + if node.name in expected_ops: + if expected_ops[node.name] != node.op: + raise ValueError( + "Expected op for node %s is different. %s vs %s" % ( + node.name, expected_ops[node.name], node.op)) + actual_ops[node.name] = node + if set(expected_ops.keys()) != set(actual_ops.keys()): + raise ValueError( + "Not all expected ops are present. Expected %s, found %s" % ( + expected_ops.keys(), actual_ops.keys())) + return actual_ops + + def IsGoogleCudaEnabled(): return pywrap_tensorflow.IsGoogleCudaEnabled() diff --git a/tensorflow/python/framework/test_util_test.py b/tensorflow/python/framework/test_util_test.py index 69af7640b6..88ca99600e 100644 --- a/tensorflow/python/framework/test_util_test.py +++ b/tensorflow/python/framework/test_util_test.py @@ -27,15 +27,29 @@ from six.moves import xrange # pylint: disable=redefined-builtin from google.protobuf import text_format from tensorflow.core.framework import graph_pb2 -from tensorflow.python.framework import dtypes from tensorflow.python.framework import errors from tensorflow.python.framework import ops from tensorflow.python.framework import test_util from tensorflow.python.platform import googletest +from tensorflow.python.ops import constant_op from tensorflow.python.ops import logging_ops + class TestUtilTest(test_util.TensorFlowTestCase): + def test_assert_ops_in_graph(self): + with self.test_session(): + constant_op.constant(["hello", "taffy"], name="hello") + test_util.assert_ops_in_graph({"hello": "Const"}, ops.get_default_graph()) + + self.assertRaises( + ValueError, test_util.assert_ops_in_graph, {"bye": "Const"}, + ops.get_default_graph()) + + self.assertRaises( + ValueError, test_util.assert_ops_in_graph, {"hello": "Variable"}, + ops.get_default_graph()) + def testIsGoogleCudaEnabled(self): # The test doesn't assert anything. It ensures the py wrapper # function is generated correctly. diff --git a/tensorflow/python/summary/event_accumulator.py b/tensorflow/python/summary/event_accumulator.py index 56bcd15d2b..95e8e179f4 100644 --- a/tensorflow/python/summary/event_accumulator.py +++ b/tensorflow/python/summary/event_accumulator.py @@ -178,25 +178,7 @@ class EventAccumulator(object): ## TODO(danmane): Have this check for restart events explicitly if (event.step < self.most_recent_step and event.HasField('summary')): - - ## Keep data in reservoirs that has a step less than event.step - _NotExpired = lambda x: x.step < event.step - num_expired_scalars = self._scalars.FilterItems(_NotExpired) - num_expired_histograms = self._histograms.FilterItems(_NotExpired) - num_expired_compressed_histograms = self._compressed_histograms.FilterItems( - _NotExpired) - num_expired_images = self._images.FilterItems(_NotExpired) - - purge_msg = ( - 'Detected out of order event.step likely caused by a Tensorflow ' - 'restart. Purging expired events from Tensorboard display ' - 'between the previous step: {} (timestamp: {}) and current step:' - ' {} (timestamp: {}). Removing {} scalars, {} histograms, {} ' - 'compressed histograms, and {} images.').format( - self.most_recent_step, self.most_recent_wall_time, event.step, - event.wall_time, num_expired_scalars, num_expired_histograms, - num_expired_compressed_histograms, num_expired_images) - logging.warn(purge_msg) + self._Purge(event) else: self.most_recent_step = event.step self.most_recent_wall_time = event.wall_time @@ -470,6 +452,53 @@ class EventAccumulator(object): ) self._images.AddItem(tag, event) + def _Purge(self, event): + """Purge all events that have occurred after the given event.step. + + Purge all events that occurred after the given event.step, but only for + the tags that the event has. Non-sequential event.steps suggest that a + Tensorflow restart occured, and we discard the out-of-order events to + display a consistent view in TensorBoard. + + Previously, the purge logic discarded all events after event.step (not just + within the affected tags), but this caused problems where race conditions in + supervisor caused many events to be unintentionally discarded. + + Args: + event: The event to use as reference for the purge. All events with + the same tags, but with a greater event.step will be purged. + """ + + def _GetExpiredList(value): + ## Keep data in reservoirs that has a step less than event.step + _NotExpired = lambda x: x.step < event.step + return [x.FilterItems(_NotExpired, value.tag) + for x in (self._scalars, self._histograms, + self._compressed_histograms, self._images)] + + expired_per_tag = [_GetExpiredList(value) for value in event.summary.value] + expired_per_type = [sum(x) for x in zip(*expired_per_tag)] + + if sum(expired_per_type) > 0: + purge_msg = _GetPurgeMessage(self.most_recent_step, + self.most_recent_wall_time, event.step, + event.wall_time, *expired_per_type) + logging.warn(purge_msg) + + +def _GetPurgeMessage(most_recent_step, most_recent_wall_time, event_step, + event_wall_time, num_expired_scalars, num_expired_histos, + num_expired_comp_histos, num_expired_images): + """Return the string message associated with TensorBoard purges.""" + return ('Detected out of order event.step likely caused by ' + 'a TensorFlow restart. Purging expired events from Tensorboard' + ' display between the previous step: {} (timestamp: {}) and ' + 'current step: {} (timestamp: {}). Removing {} scalars, {} ' + 'histograms, {} compressed histograms, and {} images.').format( + most_recent_step, most_recent_wall_time, event_step, + event_wall_time, num_expired_scalars, num_expired_histos, + num_expired_comp_histos, num_expired_images) + def _GeneratorFromPath(path): """Create an event generator for file or directory at given path string.""" diff --git a/tensorflow/python/summary/event_accumulator_test.py b/tensorflow/python/summary/event_accumulator_test.py index 394a6d2290..3684bfc369 100644 --- a/tensorflow/python/summary/event_accumulator_test.py +++ b/tensorflow/python/summary/event_accumulator_test.py @@ -26,6 +26,8 @@ import tensorflow as tf from tensorflow.core.framework import graph_pb2 from tensorflow.python.platform import gfile +from tensorflow.python.platform import googletest +from tensorflow.python.platform import logging from tensorflow.python.summary import event_accumulator as ea @@ -94,6 +96,7 @@ class MockingEventAccumulatorTest(EventAccumulatorTest): def setUp(self): super(MockingEventAccumulatorTest, self).setUp() + self.stubs = googletest.StubOutForTesting() self.empty = {ea.IMAGES: [], ea.SCALARS: [], ea.HISTOGRAMS: [], @@ -107,6 +110,7 @@ class MockingEventAccumulatorTest(EventAccumulatorTest): ea.EventAccumulator = _FakeAccumulatorConstructor def tearDown(self): + self.stubs.CleanUp() ea.EventAccumulator = self._real_constructor ea._GeneratorFromPath = self._real_generator @@ -376,6 +380,9 @@ class MockingEventAccumulatorTest(EventAccumulatorTest): If a step value is observed to be lower than what was previously seen, this should force a discard of all previous items that are outdated. """ + warnings = [] + self.stubs.Set(logging, 'warn', warnings.append) + gen = _EventGenerator() acc = ea.EventAccumulator(gen) gen.AddScalar('s1', wall_time=1, step=100, value=20) @@ -389,8 +396,45 @@ class MockingEventAccumulatorTest(EventAccumulatorTest): gen.AddScalar('s1', wall_time=1, step=201, value=20) gen.AddScalar('s1', wall_time=1, step=301, value=20) acc.Reload() + ## Check that we have discarded 200 and 300 from s1 + self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 101, 201, 301]) + + ## Check that the logging message is correct + self.assertEqual(warnings, [ea._GetPurgeMessage(300, 1, 101, 1, 2, 0, 0, 0) + ]) + + def testEventsDiscardedPerTagAfterRestart(self): + """Tests that event discards after restart, only affect the misordered tag. + + If a step value is observed to be lower than what was previously seen, + this should force a discard of all previous items that are outdated, but + only for the out of order tag. Other tags should remain unaffected. + """ + warnings = [] + self.stubs.Set(logging, 'warn', warnings.append) + + gen = _EventGenerator() + acc = ea.EventAccumulator(gen) + gen.AddScalar('s1', wall_time=1, step=100, value=20) + gen.AddScalar('s1', wall_time=1, step=200, value=20) + gen.AddScalar('s1', wall_time=1, step=300, value=20) + gen.AddScalar('s1', wall_time=1, step=101, value=20) + gen.AddScalar('s1', wall_time=1, step=201, value=20) + gen.AddScalar('s1', wall_time=1, step=301, value=20) + + gen.AddScalar('s2', wall_time=1, step=101, value=20) + gen.AddScalar('s2', wall_time=1, step=201, value=20) + gen.AddScalar('s2', wall_time=1, step=301, value=20) + + acc.Reload() ## Check that we have discarded 200 and 300 self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 101, 201, 301]) + self.assertEqual(warnings, [ea._GetPurgeMessage(300, 1, 101, 1, 2, 0, 0, 0) + ]) + + ## Check that s1 discards do not affect s2 + ## i.e. check that only events from the out of order tag are discarded + self.assertEqual([x.step for x in acc.Scalars('s2')], [101, 201, 301]) def testOnlySummaryEventsTriggerDiscards(self): """Test that file version event doesnt trigger data purge.""" diff --git a/tensorflow/python/summary/impl/reservoir.py b/tensorflow/python/summary/impl/reservoir.py index c5b5daff0c..e9567de3b2 100644 --- a/tensorflow/python/summary/impl/reservoir.py +++ b/tensorflow/python/summary/impl/reservoir.py @@ -117,18 +117,26 @@ class Reservoir(object): bucket = self._buckets[key] bucket.AddItem(item) - def FilterItems(self, filterFn): + def FilterItems(self, filterFn, key=None): """Filter items within a Reservoir, using a filtering function. Args: filterFn: A function that returns True for the items to be kept. + key: An optional bucket key to filter. If not specified, will filter all + all buckets. Returns: The number of items removed. """ with self._mutex: - return sum(bucket.FilterItems(filterFn) - for bucket in self._buckets.values()) + if key: + if key in self._buckets: + return self._buckets[key].FilterItems(filterFn) + else: + return 0 + else: + return sum(bucket.FilterItems(filterFn) + for bucket in self._buckets.values()) class _ReservoirBucket(object): diff --git a/tensorflow/python/summary/impl/reservoir_test.py b/tensorflow/python/summary/impl/reservoir_test.py index b7f72e64de..a0065cef3c 100644 --- a/tensorflow/python/summary/impl/reservoir_test.py +++ b/tensorflow/python/summary/impl/reservoir_test.py @@ -94,6 +94,23 @@ class ReservoirTest(tf.test.TestCase): r2.AddItem('key', i) self.assertNotEqual(r1.Items(key), r2.Items(key)) + def testFilterItemsByKey(self): + r = reservoir.Reservoir(100, seed=0) + for i in xrange(10): + r.AddItem('key1', i) + r.AddItem('key2', i) + + self.assertEqual(len(r.Items('key1')), 10) + self.assertEqual(len(r.Items('key2')), 10) + + self.assertEqual(r.FilterItems(lambda x: x <= 7, 'key2'), 2) + self.assertEqual(len(r.Items('key2')), 8) + self.assertEqual(len(r.Items('key1')), 10) + + self.assertEqual(r.FilterItems(lambda x: x <= 3, 'key1'), 6) + self.assertEqual(len(r.Items('key1')), 4) + self.assertEqual(len(r.Items('key2')), 8) + class ReservoirBucketTest(tf.test.TestCase): diff --git a/tensorflow/python/training/input.py b/tensorflow/python/training/input.py index ae9df87dc1..88449890f7 100644 --- a/tensorflow/python/training/input.py +++ b/tensorflow/python/training/input.py @@ -28,7 +28,6 @@ from tensorflow.python.framework import dtypes from tensorflow.python.framework import ops from tensorflow.python.framework import tensor_shape from tensorflow.python.ops import array_ops -from tensorflow.python.ops import math_ops from tensorflow.python.ops import constant_op from tensorflow.python.ops import data_flow_ops from tensorflow.python.ops import io_ops @@ -60,12 +59,15 @@ def limit_epochs(tensor, num_epochs=None, name=None): Args: tensor: Any `Tensor`. - num_epochs: An integer (optional). If specified, limits the number + num_epochs: A positive integer (optional). If specified, limits the number of steps the output tensor may be evaluated. name: A name for the operations (optional). Returns: tensor or `OutOfRange`. + + Raises: + ValueError: if `num_epochs` is invalid. """ if num_epochs is None: return tensor @@ -176,6 +178,8 @@ def slice_input_producer(tensor_list, num_epochs=None, shuffle=True, seed=None, produces each slice `num_epochs` times before generating an `OutOfRange` error. If not specified, `slice_input_producer` can cycle through the slices an unlimited number of times. + shuffle: Boolean. If true, the integers are randomly shuffled within each + epoch. seed: An integer (optional). Seed used if shuffle == True. capacity: An integer. Sets the queue capacity. name: A name for the operations (optional). @@ -184,6 +188,9 @@ def slice_input_producer(tensor_list, num_epochs=None, shuffle=True, seed=None, A list of tensors, one for each element of `tensor_list`. If the tensor in `tensor_list` has shape `[N, a, b, .., z]`, then the corresponding output tensor will have shape `[a, b, ..., z]`. + + Raises: + ValueError: if `slice_input_producer` produces nothing from `tensor_list`. """ with ops.op_scope(tensor_list, name, "input_producer"): tensor_list = ops.convert_n_to_tensor_or_indexed_slices(tensor_list) @@ -202,6 +209,7 @@ def slice_input_producer(tensor_list, num_epochs=None, shuffle=True, seed=None, # Helpers for the batching functions ------------------------------------------ + def _flatten(tensor_list_list): return [tensor for tensor_list in tensor_list_list for tensor in tensor_list] @@ -270,6 +278,7 @@ def _enqueue(queue, tensor_list, threads, enqueue_many): # Batching functions ---------------------------------------------------------- + def batch(tensor_list, batch_size, num_threads=1, capacity=32, enqueue_many=False, shapes=None, name=None): """Creates batches of tensors in `tensor_list`. diff --git a/tensorflow/tensorboard/TAG b/tensorflow/tensorboard/TAG index f599e28b8a..b4de394767 100644 --- a/tensorflow/tensorboard/TAG +++ b/tensorflow/tensorboard/TAG @@ -1 +1 @@ -10 +11 |