diff options
author | Vijay Vasudevan <vrv@google.com> | 2016-01-20 19:08:00 -0800 |
---|---|---|
committer | Manjunath Kudlur <keveman@gmail.com> | 2016-01-20 19:18:25 -0800 |
commit | e4029435aefcc6c406c9a485bcb7a0893a95f28e (patch) | |
tree | d7812cee2f9be1695a95f5eeed3f3a1495279b57 | |
parent | 20882f501cb218b359ba9f20786a2217e68cc436 (diff) |
Rollback of "Updated TensorBoard logic for discarding events after crashes to discard out of order events only within a particular tag. This was changed because race conditions in supervisor was causing many events to be unintentionally discarded."
Change: 112644077
-rw-r--r-- | tensorflow/python/summary/event_accumulator.py | 67 | ||||
-rw-r--r-- | tensorflow/python/summary/event_accumulator_test.py | 44 | ||||
-rw-r--r-- | tensorflow/python/summary/impl/reservoir.py | 14 | ||||
-rw-r--r-- | tensorflow/python/summary/impl/reservoir_test.py | 17 |
4 files changed, 22 insertions, 120 deletions
diff --git a/tensorflow/python/summary/event_accumulator.py b/tensorflow/python/summary/event_accumulator.py index 95e8e179f4..56bcd15d2b 100644 --- a/tensorflow/python/summary/event_accumulator.py +++ b/tensorflow/python/summary/event_accumulator.py @@ -178,7 +178,25 @@ class EventAccumulator(object): ## TODO(danmane): Have this check for restart events explicitly if (event.step < self.most_recent_step and event.HasField('summary')): - self._Purge(event) + + ## Keep data in reservoirs that has a step less than event.step + _NotExpired = lambda x: x.step < event.step + num_expired_scalars = self._scalars.FilterItems(_NotExpired) + num_expired_histograms = self._histograms.FilterItems(_NotExpired) + num_expired_compressed_histograms = self._compressed_histograms.FilterItems( + _NotExpired) + num_expired_images = self._images.FilterItems(_NotExpired) + + purge_msg = ( + 'Detected out of order event.step likely caused by a Tensorflow ' + 'restart. Purging expired events from Tensorboard display ' + 'between the previous step: {} (timestamp: {}) and current step:' + ' {} (timestamp: {}). Removing {} scalars, {} histograms, {} ' + 'compressed histograms, and {} images.').format( + self.most_recent_step, self.most_recent_wall_time, event.step, + event.wall_time, num_expired_scalars, num_expired_histograms, + num_expired_compressed_histograms, num_expired_images) + logging.warn(purge_msg) else: self.most_recent_step = event.step self.most_recent_wall_time = event.wall_time @@ -452,53 +470,6 @@ class EventAccumulator(object): ) self._images.AddItem(tag, event) - def _Purge(self, event): - """Purge all events that have occurred after the given event.step. - - Purge all events that occurred after the given event.step, but only for - the tags that the event has. Non-sequential event.steps suggest that a - Tensorflow restart occured, and we discard the out-of-order events to - display a consistent view in TensorBoard. - - Previously, the purge logic discarded all events after event.step (not just - within the affected tags), but this caused problems where race conditions in - supervisor caused many events to be unintentionally discarded. - - Args: - event: The event to use as reference for the purge. All events with - the same tags, but with a greater event.step will be purged. - """ - - def _GetExpiredList(value): - ## Keep data in reservoirs that has a step less than event.step - _NotExpired = lambda x: x.step < event.step - return [x.FilterItems(_NotExpired, value.tag) - for x in (self._scalars, self._histograms, - self._compressed_histograms, self._images)] - - expired_per_tag = [_GetExpiredList(value) for value in event.summary.value] - expired_per_type = [sum(x) for x in zip(*expired_per_tag)] - - if sum(expired_per_type) > 0: - purge_msg = _GetPurgeMessage(self.most_recent_step, - self.most_recent_wall_time, event.step, - event.wall_time, *expired_per_type) - logging.warn(purge_msg) - - -def _GetPurgeMessage(most_recent_step, most_recent_wall_time, event_step, - event_wall_time, num_expired_scalars, num_expired_histos, - num_expired_comp_histos, num_expired_images): - """Return the string message associated with TensorBoard purges.""" - return ('Detected out of order event.step likely caused by ' - 'a TensorFlow restart. Purging expired events from Tensorboard' - ' display between the previous step: {} (timestamp: {}) and ' - 'current step: {} (timestamp: {}). Removing {} scalars, {} ' - 'histograms, {} compressed histograms, and {} images.').format( - most_recent_step, most_recent_wall_time, event_step, - event_wall_time, num_expired_scalars, num_expired_histos, - num_expired_comp_histos, num_expired_images) - def _GeneratorFromPath(path): """Create an event generator for file or directory at given path string.""" diff --git a/tensorflow/python/summary/event_accumulator_test.py b/tensorflow/python/summary/event_accumulator_test.py index 3684bfc369..394a6d2290 100644 --- a/tensorflow/python/summary/event_accumulator_test.py +++ b/tensorflow/python/summary/event_accumulator_test.py @@ -26,8 +26,6 @@ import tensorflow as tf from tensorflow.core.framework import graph_pb2 from tensorflow.python.platform import gfile -from tensorflow.python.platform import googletest -from tensorflow.python.platform import logging from tensorflow.python.summary import event_accumulator as ea @@ -96,7 +94,6 @@ class MockingEventAccumulatorTest(EventAccumulatorTest): def setUp(self): super(MockingEventAccumulatorTest, self).setUp() - self.stubs = googletest.StubOutForTesting() self.empty = {ea.IMAGES: [], ea.SCALARS: [], ea.HISTOGRAMS: [], @@ -110,7 +107,6 @@ class MockingEventAccumulatorTest(EventAccumulatorTest): ea.EventAccumulator = _FakeAccumulatorConstructor def tearDown(self): - self.stubs.CleanUp() ea.EventAccumulator = self._real_constructor ea._GeneratorFromPath = self._real_generator @@ -380,9 +376,6 @@ class MockingEventAccumulatorTest(EventAccumulatorTest): If a step value is observed to be lower than what was previously seen, this should force a discard of all previous items that are outdated. """ - warnings = [] - self.stubs.Set(logging, 'warn', warnings.append) - gen = _EventGenerator() acc = ea.EventAccumulator(gen) gen.AddScalar('s1', wall_time=1, step=100, value=20) @@ -396,45 +389,8 @@ class MockingEventAccumulatorTest(EventAccumulatorTest): gen.AddScalar('s1', wall_time=1, step=201, value=20) gen.AddScalar('s1', wall_time=1, step=301, value=20) acc.Reload() - ## Check that we have discarded 200 and 300 from s1 - self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 101, 201, 301]) - - ## Check that the logging message is correct - self.assertEqual(warnings, [ea._GetPurgeMessage(300, 1, 101, 1, 2, 0, 0, 0) - ]) - - def testEventsDiscardedPerTagAfterRestart(self): - """Tests that event discards after restart, only affect the misordered tag. - - If a step value is observed to be lower than what was previously seen, - this should force a discard of all previous items that are outdated, but - only for the out of order tag. Other tags should remain unaffected. - """ - warnings = [] - self.stubs.Set(logging, 'warn', warnings.append) - - gen = _EventGenerator() - acc = ea.EventAccumulator(gen) - gen.AddScalar('s1', wall_time=1, step=100, value=20) - gen.AddScalar('s1', wall_time=1, step=200, value=20) - gen.AddScalar('s1', wall_time=1, step=300, value=20) - gen.AddScalar('s1', wall_time=1, step=101, value=20) - gen.AddScalar('s1', wall_time=1, step=201, value=20) - gen.AddScalar('s1', wall_time=1, step=301, value=20) - - gen.AddScalar('s2', wall_time=1, step=101, value=20) - gen.AddScalar('s2', wall_time=1, step=201, value=20) - gen.AddScalar('s2', wall_time=1, step=301, value=20) - - acc.Reload() ## Check that we have discarded 200 and 300 self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 101, 201, 301]) - self.assertEqual(warnings, [ea._GetPurgeMessage(300, 1, 101, 1, 2, 0, 0, 0) - ]) - - ## Check that s1 discards do not affect s2 - ## i.e. check that only events from the out of order tag are discarded - self.assertEqual([x.step for x in acc.Scalars('s2')], [101, 201, 301]) def testOnlySummaryEventsTriggerDiscards(self): """Test that file version event doesnt trigger data purge.""" diff --git a/tensorflow/python/summary/impl/reservoir.py b/tensorflow/python/summary/impl/reservoir.py index e9567de3b2..c5b5daff0c 100644 --- a/tensorflow/python/summary/impl/reservoir.py +++ b/tensorflow/python/summary/impl/reservoir.py @@ -117,26 +117,18 @@ class Reservoir(object): bucket = self._buckets[key] bucket.AddItem(item) - def FilterItems(self, filterFn, key=None): + def FilterItems(self, filterFn): """Filter items within a Reservoir, using a filtering function. Args: filterFn: A function that returns True for the items to be kept. - key: An optional bucket key to filter. If not specified, will filter all - all buckets. Returns: The number of items removed. """ with self._mutex: - if key: - if key in self._buckets: - return self._buckets[key].FilterItems(filterFn) - else: - return 0 - else: - return sum(bucket.FilterItems(filterFn) - for bucket in self._buckets.values()) + return sum(bucket.FilterItems(filterFn) + for bucket in self._buckets.values()) class _ReservoirBucket(object): diff --git a/tensorflow/python/summary/impl/reservoir_test.py b/tensorflow/python/summary/impl/reservoir_test.py index a0065cef3c..b7f72e64de 100644 --- a/tensorflow/python/summary/impl/reservoir_test.py +++ b/tensorflow/python/summary/impl/reservoir_test.py @@ -94,23 +94,6 @@ class ReservoirTest(tf.test.TestCase): r2.AddItem('key', i) self.assertNotEqual(r1.Items(key), r2.Items(key)) - def testFilterItemsByKey(self): - r = reservoir.Reservoir(100, seed=0) - for i in xrange(10): - r.AddItem('key1', i) - r.AddItem('key2', i) - - self.assertEqual(len(r.Items('key1')), 10) - self.assertEqual(len(r.Items('key2')), 10) - - self.assertEqual(r.FilterItems(lambda x: x <= 7, 'key2'), 2) - self.assertEqual(len(r.Items('key2')), 8) - self.assertEqual(len(r.Items('key1')), 10) - - self.assertEqual(r.FilterItems(lambda x: x <= 3, 'key1'), 6) - self.assertEqual(len(r.Items('key1')), 4) - self.assertEqual(len(r.Items('key2')), 8) - class ReservoirBucketTest(tf.test.TestCase): |