aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--tensorflow/python/framework/test_util.py28
-rw-r--r--tensorflow/python/framework/test_util_test.py16
-rw-r--r--tensorflow/python/summary/event_accumulator.py67
-rw-r--r--tensorflow/python/summary/event_accumulator_test.py44
-rw-r--r--tensorflow/python/summary/impl/reservoir.py14
-rw-r--r--tensorflow/python/summary/impl/reservoir_test.py17
-rw-r--r--tensorflow/python/training/input.py13
-rw-r--r--tensorflow/tensorboard/TAG2
8 files changed, 175 insertions, 26 deletions
diff --git a/tensorflow/python/framework/test_util.py b/tensorflow/python/framework/test_util.py
index 0f4eb744f1..df08d5d5f0 100644
--- a/tensorflow/python/framework/test_util.py
+++ b/tensorflow/python/framework/test_util.py
@@ -44,6 +44,34 @@ from tensorflow.python.platform import logging
from tensorflow.python.util.protobuf import compare
+def assert_ops_in_graph(expected_ops, graph):
+ """Assert all expected operations are found.
+
+ Args:
+ expected_ops: `dict<string, string>` of op name to op type.
+ graph: Graph to check.
+ Returns:
+ `dict<string, node>` of node name to node.
+
+ Raises:
+ ValueError: If the expected ops are not present in the graph.
+ """
+ actual_ops = {}
+ gd = graph.as_graph_def()
+ for node in gd.node:
+ if node.name in expected_ops:
+ if expected_ops[node.name] != node.op:
+ raise ValueError(
+ "Expected op for node %s is different. %s vs %s" % (
+ node.name, expected_ops[node.name], node.op))
+ actual_ops[node.name] = node
+ if set(expected_ops.keys()) != set(actual_ops.keys()):
+ raise ValueError(
+ "Not all expected ops are present. Expected %s, found %s" % (
+ expected_ops.keys(), actual_ops.keys()))
+ return actual_ops
+
+
def IsGoogleCudaEnabled():
return pywrap_tensorflow.IsGoogleCudaEnabled()
diff --git a/tensorflow/python/framework/test_util_test.py b/tensorflow/python/framework/test_util_test.py
index 69af7640b6..88ca99600e 100644
--- a/tensorflow/python/framework/test_util_test.py
+++ b/tensorflow/python/framework/test_util_test.py
@@ -27,15 +27,29 @@ from six.moves import xrange # pylint: disable=redefined-builtin
from google.protobuf import text_format
from tensorflow.core.framework import graph_pb2
-from tensorflow.python.framework import dtypes
from tensorflow.python.framework import errors
from tensorflow.python.framework import ops
from tensorflow.python.framework import test_util
from tensorflow.python.platform import googletest
+from tensorflow.python.ops import constant_op
from tensorflow.python.ops import logging_ops
+
class TestUtilTest(test_util.TensorFlowTestCase):
+ def test_assert_ops_in_graph(self):
+ with self.test_session():
+ constant_op.constant(["hello", "taffy"], name="hello")
+ test_util.assert_ops_in_graph({"hello": "Const"}, ops.get_default_graph())
+
+ self.assertRaises(
+ ValueError, test_util.assert_ops_in_graph, {"bye": "Const"},
+ ops.get_default_graph())
+
+ self.assertRaises(
+ ValueError, test_util.assert_ops_in_graph, {"hello": "Variable"},
+ ops.get_default_graph())
+
def testIsGoogleCudaEnabled(self):
# The test doesn't assert anything. It ensures the py wrapper
# function is generated correctly.
diff --git a/tensorflow/python/summary/event_accumulator.py b/tensorflow/python/summary/event_accumulator.py
index 56bcd15d2b..95e8e179f4 100644
--- a/tensorflow/python/summary/event_accumulator.py
+++ b/tensorflow/python/summary/event_accumulator.py
@@ -178,25 +178,7 @@ class EventAccumulator(object):
## TODO(danmane): Have this check for restart events explicitly
if (event.step < self.most_recent_step and
event.HasField('summary')):
-
- ## Keep data in reservoirs that has a step less than event.step
- _NotExpired = lambda x: x.step < event.step
- num_expired_scalars = self._scalars.FilterItems(_NotExpired)
- num_expired_histograms = self._histograms.FilterItems(_NotExpired)
- num_expired_compressed_histograms = self._compressed_histograms.FilterItems(
- _NotExpired)
- num_expired_images = self._images.FilterItems(_NotExpired)
-
- purge_msg = (
- 'Detected out of order event.step likely caused by a Tensorflow '
- 'restart. Purging expired events from Tensorboard display '
- 'between the previous step: {} (timestamp: {}) and current step:'
- ' {} (timestamp: {}). Removing {} scalars, {} histograms, {} '
- 'compressed histograms, and {} images.').format(
- self.most_recent_step, self.most_recent_wall_time, event.step,
- event.wall_time, num_expired_scalars, num_expired_histograms,
- num_expired_compressed_histograms, num_expired_images)
- logging.warn(purge_msg)
+ self._Purge(event)
else:
self.most_recent_step = event.step
self.most_recent_wall_time = event.wall_time
@@ -470,6 +452,53 @@ class EventAccumulator(object):
)
self._images.AddItem(tag, event)
+ def _Purge(self, event):
+ """Purge all events that have occurred after the given event.step.
+
+ Purge all events that occurred after the given event.step, but only for
+ the tags that the event has. Non-sequential event.steps suggest that a
+ Tensorflow restart occured, and we discard the out-of-order events to
+ display a consistent view in TensorBoard.
+
+ Previously, the purge logic discarded all events after event.step (not just
+ within the affected tags), but this caused problems where race conditions in
+ supervisor caused many events to be unintentionally discarded.
+
+ Args:
+ event: The event to use as reference for the purge. All events with
+ the same tags, but with a greater event.step will be purged.
+ """
+
+ def _GetExpiredList(value):
+ ## Keep data in reservoirs that has a step less than event.step
+ _NotExpired = lambda x: x.step < event.step
+ return [x.FilterItems(_NotExpired, value.tag)
+ for x in (self._scalars, self._histograms,
+ self._compressed_histograms, self._images)]
+
+ expired_per_tag = [_GetExpiredList(value) for value in event.summary.value]
+ expired_per_type = [sum(x) for x in zip(*expired_per_tag)]
+
+ if sum(expired_per_type) > 0:
+ purge_msg = _GetPurgeMessage(self.most_recent_step,
+ self.most_recent_wall_time, event.step,
+ event.wall_time, *expired_per_type)
+ logging.warn(purge_msg)
+
+
+def _GetPurgeMessage(most_recent_step, most_recent_wall_time, event_step,
+ event_wall_time, num_expired_scalars, num_expired_histos,
+ num_expired_comp_histos, num_expired_images):
+ """Return the string message associated with TensorBoard purges."""
+ return ('Detected out of order event.step likely caused by '
+ 'a TensorFlow restart. Purging expired events from Tensorboard'
+ ' display between the previous step: {} (timestamp: {}) and '
+ 'current step: {} (timestamp: {}). Removing {} scalars, {} '
+ 'histograms, {} compressed histograms, and {} images.').format(
+ most_recent_step, most_recent_wall_time, event_step,
+ event_wall_time, num_expired_scalars, num_expired_histos,
+ num_expired_comp_histos, num_expired_images)
+
def _GeneratorFromPath(path):
"""Create an event generator for file or directory at given path string."""
diff --git a/tensorflow/python/summary/event_accumulator_test.py b/tensorflow/python/summary/event_accumulator_test.py
index 394a6d2290..3684bfc369 100644
--- a/tensorflow/python/summary/event_accumulator_test.py
+++ b/tensorflow/python/summary/event_accumulator_test.py
@@ -26,6 +26,8 @@ import tensorflow as tf
from tensorflow.core.framework import graph_pb2
from tensorflow.python.platform import gfile
+from tensorflow.python.platform import googletest
+from tensorflow.python.platform import logging
from tensorflow.python.summary import event_accumulator as ea
@@ -94,6 +96,7 @@ class MockingEventAccumulatorTest(EventAccumulatorTest):
def setUp(self):
super(MockingEventAccumulatorTest, self).setUp()
+ self.stubs = googletest.StubOutForTesting()
self.empty = {ea.IMAGES: [],
ea.SCALARS: [],
ea.HISTOGRAMS: [],
@@ -107,6 +110,7 @@ class MockingEventAccumulatorTest(EventAccumulatorTest):
ea.EventAccumulator = _FakeAccumulatorConstructor
def tearDown(self):
+ self.stubs.CleanUp()
ea.EventAccumulator = self._real_constructor
ea._GeneratorFromPath = self._real_generator
@@ -376,6 +380,9 @@ class MockingEventAccumulatorTest(EventAccumulatorTest):
If a step value is observed to be lower than what was previously seen,
this should force a discard of all previous items that are outdated.
"""
+ warnings = []
+ self.stubs.Set(logging, 'warn', warnings.append)
+
gen = _EventGenerator()
acc = ea.EventAccumulator(gen)
gen.AddScalar('s1', wall_time=1, step=100, value=20)
@@ -389,8 +396,45 @@ class MockingEventAccumulatorTest(EventAccumulatorTest):
gen.AddScalar('s1', wall_time=1, step=201, value=20)
gen.AddScalar('s1', wall_time=1, step=301, value=20)
acc.Reload()
+ ## Check that we have discarded 200 and 300 from s1
+ self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 101, 201, 301])
+
+ ## Check that the logging message is correct
+ self.assertEqual(warnings, [ea._GetPurgeMessage(300, 1, 101, 1, 2, 0, 0, 0)
+ ])
+
+ def testEventsDiscardedPerTagAfterRestart(self):
+ """Tests that event discards after restart, only affect the misordered tag.
+
+ If a step value is observed to be lower than what was previously seen,
+ this should force a discard of all previous items that are outdated, but
+ only for the out of order tag. Other tags should remain unaffected.
+ """
+ warnings = []
+ self.stubs.Set(logging, 'warn', warnings.append)
+
+ gen = _EventGenerator()
+ acc = ea.EventAccumulator(gen)
+ gen.AddScalar('s1', wall_time=1, step=100, value=20)
+ gen.AddScalar('s1', wall_time=1, step=200, value=20)
+ gen.AddScalar('s1', wall_time=1, step=300, value=20)
+ gen.AddScalar('s1', wall_time=1, step=101, value=20)
+ gen.AddScalar('s1', wall_time=1, step=201, value=20)
+ gen.AddScalar('s1', wall_time=1, step=301, value=20)
+
+ gen.AddScalar('s2', wall_time=1, step=101, value=20)
+ gen.AddScalar('s2', wall_time=1, step=201, value=20)
+ gen.AddScalar('s2', wall_time=1, step=301, value=20)
+
+ acc.Reload()
## Check that we have discarded 200 and 300
self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 101, 201, 301])
+ self.assertEqual(warnings, [ea._GetPurgeMessage(300, 1, 101, 1, 2, 0, 0, 0)
+ ])
+
+ ## Check that s1 discards do not affect s2
+ ## i.e. check that only events from the out of order tag are discarded
+ self.assertEqual([x.step for x in acc.Scalars('s2')], [101, 201, 301])
def testOnlySummaryEventsTriggerDiscards(self):
"""Test that file version event doesnt trigger data purge."""
diff --git a/tensorflow/python/summary/impl/reservoir.py b/tensorflow/python/summary/impl/reservoir.py
index c5b5daff0c..e9567de3b2 100644
--- a/tensorflow/python/summary/impl/reservoir.py
+++ b/tensorflow/python/summary/impl/reservoir.py
@@ -117,18 +117,26 @@ class Reservoir(object):
bucket = self._buckets[key]
bucket.AddItem(item)
- def FilterItems(self, filterFn):
+ def FilterItems(self, filterFn, key=None):
"""Filter items within a Reservoir, using a filtering function.
Args:
filterFn: A function that returns True for the items to be kept.
+ key: An optional bucket key to filter. If not specified, will filter all
+ all buckets.
Returns:
The number of items removed.
"""
with self._mutex:
- return sum(bucket.FilterItems(filterFn)
- for bucket in self._buckets.values())
+ if key:
+ if key in self._buckets:
+ return self._buckets[key].FilterItems(filterFn)
+ else:
+ return 0
+ else:
+ return sum(bucket.FilterItems(filterFn)
+ for bucket in self._buckets.values())
class _ReservoirBucket(object):
diff --git a/tensorflow/python/summary/impl/reservoir_test.py b/tensorflow/python/summary/impl/reservoir_test.py
index b7f72e64de..a0065cef3c 100644
--- a/tensorflow/python/summary/impl/reservoir_test.py
+++ b/tensorflow/python/summary/impl/reservoir_test.py
@@ -94,6 +94,23 @@ class ReservoirTest(tf.test.TestCase):
r2.AddItem('key', i)
self.assertNotEqual(r1.Items(key), r2.Items(key))
+ def testFilterItemsByKey(self):
+ r = reservoir.Reservoir(100, seed=0)
+ for i in xrange(10):
+ r.AddItem('key1', i)
+ r.AddItem('key2', i)
+
+ self.assertEqual(len(r.Items('key1')), 10)
+ self.assertEqual(len(r.Items('key2')), 10)
+
+ self.assertEqual(r.FilterItems(lambda x: x <= 7, 'key2'), 2)
+ self.assertEqual(len(r.Items('key2')), 8)
+ self.assertEqual(len(r.Items('key1')), 10)
+
+ self.assertEqual(r.FilterItems(lambda x: x <= 3, 'key1'), 6)
+ self.assertEqual(len(r.Items('key1')), 4)
+ self.assertEqual(len(r.Items('key2')), 8)
+
class ReservoirBucketTest(tf.test.TestCase):
diff --git a/tensorflow/python/training/input.py b/tensorflow/python/training/input.py
index ae9df87dc1..88449890f7 100644
--- a/tensorflow/python/training/input.py
+++ b/tensorflow/python/training/input.py
@@ -28,7 +28,6 @@ from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.framework import tensor_shape
from tensorflow.python.ops import array_ops
-from tensorflow.python.ops import math_ops
from tensorflow.python.ops import constant_op
from tensorflow.python.ops import data_flow_ops
from tensorflow.python.ops import io_ops
@@ -60,12 +59,15 @@ def limit_epochs(tensor, num_epochs=None, name=None):
Args:
tensor: Any `Tensor`.
- num_epochs: An integer (optional). If specified, limits the number
+ num_epochs: A positive integer (optional). If specified, limits the number
of steps the output tensor may be evaluated.
name: A name for the operations (optional).
Returns:
tensor or `OutOfRange`.
+
+ Raises:
+ ValueError: if `num_epochs` is invalid.
"""
if num_epochs is None:
return tensor
@@ -176,6 +178,8 @@ def slice_input_producer(tensor_list, num_epochs=None, shuffle=True, seed=None,
produces each slice `num_epochs` times before generating
an `OutOfRange` error. If not specified, `slice_input_producer` can cycle
through the slices an unlimited number of times.
+ shuffle: Boolean. If true, the integers are randomly shuffled within each
+ epoch.
seed: An integer (optional). Seed used if shuffle == True.
capacity: An integer. Sets the queue capacity.
name: A name for the operations (optional).
@@ -184,6 +188,9 @@ def slice_input_producer(tensor_list, num_epochs=None, shuffle=True, seed=None,
A list of tensors, one for each element of `tensor_list`. If the tensor
in `tensor_list` has shape `[N, a, b, .., z]`, then the corresponding output
tensor will have shape `[a, b, ..., z]`.
+
+ Raises:
+ ValueError: if `slice_input_producer` produces nothing from `tensor_list`.
"""
with ops.op_scope(tensor_list, name, "input_producer"):
tensor_list = ops.convert_n_to_tensor_or_indexed_slices(tensor_list)
@@ -202,6 +209,7 @@ def slice_input_producer(tensor_list, num_epochs=None, shuffle=True, seed=None,
# Helpers for the batching functions ------------------------------------------
+
def _flatten(tensor_list_list):
return [tensor for tensor_list in tensor_list_list for tensor in tensor_list]
@@ -270,6 +278,7 @@ def _enqueue(queue, tensor_list, threads, enqueue_many):
# Batching functions ----------------------------------------------------------
+
def batch(tensor_list, batch_size, num_threads=1, capacity=32,
enqueue_many=False, shapes=None, name=None):
"""Creates batches of tensors in `tensor_list`.
diff --git a/tensorflow/tensorboard/TAG b/tensorflow/tensorboard/TAG
index f599e28b8a..b4de394767 100644
--- a/tensorflow/tensorboard/TAG
+++ b/tensorflow/tensorboard/TAG
@@ -1 +1 @@
-10
+11