aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Vijay Vasudevan <vrv@google.com>2016-01-20 19:08:00 -0800
committerGravatar Manjunath Kudlur <keveman@gmail.com>2016-01-20 19:18:25 -0800
commite4029435aefcc6c406c9a485bcb7a0893a95f28e (patch)
treed7812cee2f9be1695a95f5eeed3f3a1495279b57
parent20882f501cb218b359ba9f20786a2217e68cc436 (diff)
Rollback of "Updated TensorBoard logic for discarding events after crashes to discard out of order events only within a particular tag. This was changed because race conditions in supervisor was causing many events to be unintentionally discarded."
Change: 112644077
-rw-r--r--tensorflow/python/summary/event_accumulator.py67
-rw-r--r--tensorflow/python/summary/event_accumulator_test.py44
-rw-r--r--tensorflow/python/summary/impl/reservoir.py14
-rw-r--r--tensorflow/python/summary/impl/reservoir_test.py17
4 files changed, 22 insertions, 120 deletions
diff --git a/tensorflow/python/summary/event_accumulator.py b/tensorflow/python/summary/event_accumulator.py
index 95e8e179f4..56bcd15d2b 100644
--- a/tensorflow/python/summary/event_accumulator.py
+++ b/tensorflow/python/summary/event_accumulator.py
@@ -178,7 +178,25 @@ class EventAccumulator(object):
## TODO(danmane): Have this check for restart events explicitly
if (event.step < self.most_recent_step and
event.HasField('summary')):
- self._Purge(event)
+
+ ## Keep data in reservoirs that has a step less than event.step
+ _NotExpired = lambda x: x.step < event.step
+ num_expired_scalars = self._scalars.FilterItems(_NotExpired)
+ num_expired_histograms = self._histograms.FilterItems(_NotExpired)
+ num_expired_compressed_histograms = self._compressed_histograms.FilterItems(
+ _NotExpired)
+ num_expired_images = self._images.FilterItems(_NotExpired)
+
+ purge_msg = (
+ 'Detected out of order event.step likely caused by a Tensorflow '
+ 'restart. Purging expired events from Tensorboard display '
+ 'between the previous step: {} (timestamp: {}) and current step:'
+ ' {} (timestamp: {}). Removing {} scalars, {} histograms, {} '
+ 'compressed histograms, and {} images.').format(
+ self.most_recent_step, self.most_recent_wall_time, event.step,
+ event.wall_time, num_expired_scalars, num_expired_histograms,
+ num_expired_compressed_histograms, num_expired_images)
+ logging.warn(purge_msg)
else:
self.most_recent_step = event.step
self.most_recent_wall_time = event.wall_time
@@ -452,53 +470,6 @@ class EventAccumulator(object):
)
self._images.AddItem(tag, event)
- def _Purge(self, event):
- """Purge all events that have occurred after the given event.step.
-
- Purge all events that occurred after the given event.step, but only for
- the tags that the event has. Non-sequential event.steps suggest that a
- Tensorflow restart occured, and we discard the out-of-order events to
- display a consistent view in TensorBoard.
-
- Previously, the purge logic discarded all events after event.step (not just
- within the affected tags), but this caused problems where race conditions in
- supervisor caused many events to be unintentionally discarded.
-
- Args:
- event: The event to use as reference for the purge. All events with
- the same tags, but with a greater event.step will be purged.
- """
-
- def _GetExpiredList(value):
- ## Keep data in reservoirs that has a step less than event.step
- _NotExpired = lambda x: x.step < event.step
- return [x.FilterItems(_NotExpired, value.tag)
- for x in (self._scalars, self._histograms,
- self._compressed_histograms, self._images)]
-
- expired_per_tag = [_GetExpiredList(value) for value in event.summary.value]
- expired_per_type = [sum(x) for x in zip(*expired_per_tag)]
-
- if sum(expired_per_type) > 0:
- purge_msg = _GetPurgeMessage(self.most_recent_step,
- self.most_recent_wall_time, event.step,
- event.wall_time, *expired_per_type)
- logging.warn(purge_msg)
-
-
-def _GetPurgeMessage(most_recent_step, most_recent_wall_time, event_step,
- event_wall_time, num_expired_scalars, num_expired_histos,
- num_expired_comp_histos, num_expired_images):
- """Return the string message associated with TensorBoard purges."""
- return ('Detected out of order event.step likely caused by '
- 'a TensorFlow restart. Purging expired events from Tensorboard'
- ' display between the previous step: {} (timestamp: {}) and '
- 'current step: {} (timestamp: {}). Removing {} scalars, {} '
- 'histograms, {} compressed histograms, and {} images.').format(
- most_recent_step, most_recent_wall_time, event_step,
- event_wall_time, num_expired_scalars, num_expired_histos,
- num_expired_comp_histos, num_expired_images)
-
def _GeneratorFromPath(path):
"""Create an event generator for file or directory at given path string."""
diff --git a/tensorflow/python/summary/event_accumulator_test.py b/tensorflow/python/summary/event_accumulator_test.py
index 3684bfc369..394a6d2290 100644
--- a/tensorflow/python/summary/event_accumulator_test.py
+++ b/tensorflow/python/summary/event_accumulator_test.py
@@ -26,8 +26,6 @@ import tensorflow as tf
from tensorflow.core.framework import graph_pb2
from tensorflow.python.platform import gfile
-from tensorflow.python.platform import googletest
-from tensorflow.python.platform import logging
from tensorflow.python.summary import event_accumulator as ea
@@ -96,7 +94,6 @@ class MockingEventAccumulatorTest(EventAccumulatorTest):
def setUp(self):
super(MockingEventAccumulatorTest, self).setUp()
- self.stubs = googletest.StubOutForTesting()
self.empty = {ea.IMAGES: [],
ea.SCALARS: [],
ea.HISTOGRAMS: [],
@@ -110,7 +107,6 @@ class MockingEventAccumulatorTest(EventAccumulatorTest):
ea.EventAccumulator = _FakeAccumulatorConstructor
def tearDown(self):
- self.stubs.CleanUp()
ea.EventAccumulator = self._real_constructor
ea._GeneratorFromPath = self._real_generator
@@ -380,9 +376,6 @@ class MockingEventAccumulatorTest(EventAccumulatorTest):
If a step value is observed to be lower than what was previously seen,
this should force a discard of all previous items that are outdated.
"""
- warnings = []
- self.stubs.Set(logging, 'warn', warnings.append)
-
gen = _EventGenerator()
acc = ea.EventAccumulator(gen)
gen.AddScalar('s1', wall_time=1, step=100, value=20)
@@ -396,45 +389,8 @@ class MockingEventAccumulatorTest(EventAccumulatorTest):
gen.AddScalar('s1', wall_time=1, step=201, value=20)
gen.AddScalar('s1', wall_time=1, step=301, value=20)
acc.Reload()
- ## Check that we have discarded 200 and 300 from s1
- self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 101, 201, 301])
-
- ## Check that the logging message is correct
- self.assertEqual(warnings, [ea._GetPurgeMessage(300, 1, 101, 1, 2, 0, 0, 0)
- ])
-
- def testEventsDiscardedPerTagAfterRestart(self):
- """Tests that event discards after restart, only affect the misordered tag.
-
- If a step value is observed to be lower than what was previously seen,
- this should force a discard of all previous items that are outdated, but
- only for the out of order tag. Other tags should remain unaffected.
- """
- warnings = []
- self.stubs.Set(logging, 'warn', warnings.append)
-
- gen = _EventGenerator()
- acc = ea.EventAccumulator(gen)
- gen.AddScalar('s1', wall_time=1, step=100, value=20)
- gen.AddScalar('s1', wall_time=1, step=200, value=20)
- gen.AddScalar('s1', wall_time=1, step=300, value=20)
- gen.AddScalar('s1', wall_time=1, step=101, value=20)
- gen.AddScalar('s1', wall_time=1, step=201, value=20)
- gen.AddScalar('s1', wall_time=1, step=301, value=20)
-
- gen.AddScalar('s2', wall_time=1, step=101, value=20)
- gen.AddScalar('s2', wall_time=1, step=201, value=20)
- gen.AddScalar('s2', wall_time=1, step=301, value=20)
-
- acc.Reload()
## Check that we have discarded 200 and 300
self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 101, 201, 301])
- self.assertEqual(warnings, [ea._GetPurgeMessage(300, 1, 101, 1, 2, 0, 0, 0)
- ])
-
- ## Check that s1 discards do not affect s2
- ## i.e. check that only events from the out of order tag are discarded
- self.assertEqual([x.step for x in acc.Scalars('s2')], [101, 201, 301])
def testOnlySummaryEventsTriggerDiscards(self):
"""Test that file version event doesnt trigger data purge."""
diff --git a/tensorflow/python/summary/impl/reservoir.py b/tensorflow/python/summary/impl/reservoir.py
index e9567de3b2..c5b5daff0c 100644
--- a/tensorflow/python/summary/impl/reservoir.py
+++ b/tensorflow/python/summary/impl/reservoir.py
@@ -117,26 +117,18 @@ class Reservoir(object):
bucket = self._buckets[key]
bucket.AddItem(item)
- def FilterItems(self, filterFn, key=None):
+ def FilterItems(self, filterFn):
"""Filter items within a Reservoir, using a filtering function.
Args:
filterFn: A function that returns True for the items to be kept.
- key: An optional bucket key to filter. If not specified, will filter all
- all buckets.
Returns:
The number of items removed.
"""
with self._mutex:
- if key:
- if key in self._buckets:
- return self._buckets[key].FilterItems(filterFn)
- else:
- return 0
- else:
- return sum(bucket.FilterItems(filterFn)
- for bucket in self._buckets.values())
+ return sum(bucket.FilterItems(filterFn)
+ for bucket in self._buckets.values())
class _ReservoirBucket(object):
diff --git a/tensorflow/python/summary/impl/reservoir_test.py b/tensorflow/python/summary/impl/reservoir_test.py
index a0065cef3c..b7f72e64de 100644
--- a/tensorflow/python/summary/impl/reservoir_test.py
+++ b/tensorflow/python/summary/impl/reservoir_test.py
@@ -94,23 +94,6 @@ class ReservoirTest(tf.test.TestCase):
r2.AddItem('key', i)
self.assertNotEqual(r1.Items(key), r2.Items(key))
- def testFilterItemsByKey(self):
- r = reservoir.Reservoir(100, seed=0)
- for i in xrange(10):
- r.AddItem('key1', i)
- r.AddItem('key2', i)
-
- self.assertEqual(len(r.Items('key1')), 10)
- self.assertEqual(len(r.Items('key2')), 10)
-
- self.assertEqual(r.FilterItems(lambda x: x <= 7, 'key2'), 2)
- self.assertEqual(len(r.Items('key2')), 8)
- self.assertEqual(len(r.Items('key1')), 10)
-
- self.assertEqual(r.FilterItems(lambda x: x <= 3, 'key1'), 6)
- self.assertEqual(len(r.Items('key1')), 4)
- self.assertEqual(len(r.Items('key2')), 8)
-
class ReservoirBucketTest(tf.test.TestCase):