aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/python/summary
diff options
context:
space:
mode:
authorGravatar Dandelion Mané <dandelion@google.com>2017-03-08 15:02:19 -0800
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2017-03-08 15:29:33 -0800
commit8c1c861ccc488497ad44bb8ec7b1b49ff5ef0a2c (patch)
tree74a2ef4f0efbd1b0897aa03b65d29666e2777ba8 /tensorflow/python/summary
parent0b044236797eb3b2adb437c4f82d0897bceb554e (diff)
Move TensorBoard backend logic from python/summary to tensorboard/backend.
The EventMultiplexer and EventAccumulator classes are fairly tightly tied to TensorBoard, so we should migrate them to the tensorboard/ folder in preparation for extracting TensorBoard from the TensorFlow repo. Change: 149587771
Diffstat (limited to 'tensorflow/python/summary')
-rw-r--r--tensorflow/python/summary/event_accumulator.py812
-rw-r--r--tensorflow/python/summary/event_accumulator_test.py980
-rw-r--r--tensorflow/python/summary/event_file_inspector.py427
-rw-r--r--tensorflow/python/summary/event_file_inspector_test.py172
-rw-r--r--tensorflow/python/summary/event_multiplexer.py428
-rw-r--r--tensorflow/python/summary/event_multiplexer_test.py319
-rw-r--r--tensorflow/python/summary/impl/__init__.py0
-rw-r--r--tensorflow/python/summary/impl/directory_watcher.py254
-rw-r--r--tensorflow/python/summary/impl/directory_watcher_test.py209
-rw-r--r--tensorflow/python/summary/impl/event_file_loader.py80
-rw-r--r--tensorflow/python/summary/impl/event_file_loader_test.py92
-rw-r--r--tensorflow/python/summary/impl/io_wrapper.py52
-rw-r--r--tensorflow/python/summary/impl/reservoir.py253
-rw-r--r--tensorflow/python/summary/impl/reservoir_test.py279
14 files changed, 0 insertions, 4357 deletions
diff --git a/tensorflow/python/summary/event_accumulator.py b/tensorflow/python/summary/event_accumulator.py
deleted file mode 100644
index 23408705bd..0000000000
--- a/tensorflow/python/summary/event_accumulator.py
+++ /dev/null
@@ -1,812 +0,0 @@
-# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ==============================================================================
-"""Takes a generator of values, and accumulates them for a frontend."""
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-import collections
-import os
-import re
-import threading
-
-import numpy as np
-
-from tensorflow.core.framework import graph_pb2
-from tensorflow.core.protobuf import meta_graph_pb2
-from tensorflow.core.protobuf.config_pb2 import RunMetadata
-from tensorflow.core.util.event_pb2 import SessionLog
-from tensorflow.python.platform import tf_logging as logging
-from tensorflow.python.summary.impl import directory_watcher
-from tensorflow.python.summary.impl import event_file_loader
-from tensorflow.python.summary.impl import reservoir
-from tensorflow.python.util import compat
-
-namedtuple = collections.namedtuple
-ScalarEvent = namedtuple('ScalarEvent', ['wall_time', 'step', 'value'])
-
-HealthPillEvent = namedtuple(
- 'HealthPillEvent',
- ['wall_time', 'step', 'node_name', 'output_slot', 'value'])
-
-CompressedHistogramEvent = namedtuple('CompressedHistogramEvent',
- ['wall_time', 'step',
- 'compressed_histogram_values'])
-
-CompressedHistogramValue = namedtuple('CompressedHistogramValue',
- ['basis_point', 'value'])
-
-HistogramEvent = namedtuple('HistogramEvent',
- ['wall_time', 'step', 'histogram_value'])
-
-HistogramValue = namedtuple('HistogramValue', ['min', 'max', 'num', 'sum',
- 'sum_squares', 'bucket_limit',
- 'bucket'])
-
-ImageEvent = namedtuple('ImageEvent', ['wall_time', 'step',
- 'encoded_image_string', 'width',
- 'height'])
-
-AudioEvent = namedtuple('AudioEvent', ['wall_time', 'step',
- 'encoded_audio_string', 'content_type',
- 'sample_rate', 'length_frames'])
-
-TensorEvent = namedtuple('TensorEvent', ['wall_time', 'step', 'tensor_proto'])
-
-## Different types of summary events handled by the event_accumulator
-SUMMARY_TYPES = {
- 'simple_value': '_ProcessScalar',
- 'histo': '_ProcessHistogram',
- 'image': '_ProcessImage',
- 'audio': '_ProcessAudio',
- 'tensor': '_ProcessTensor',
-}
-
-## The tagTypes below are just arbitrary strings chosen to pass the type
-## information of the tag from the backend to the frontend
-COMPRESSED_HISTOGRAMS = 'compressedHistograms'
-HISTOGRAMS = 'histograms'
-IMAGES = 'images'
-AUDIO = 'audio'
-SCALARS = 'scalars'
-TENSORS = 'tensors'
-HEALTH_PILLS = 'health_pills'
-GRAPH = 'graph'
-META_GRAPH = 'meta_graph'
-RUN_METADATA = 'run_metadata'
-
-## Normal CDF for std_devs: (-Inf, -1.5, -1, -0.5, 0, 0.5, 1, 1.5, Inf)
-## naturally gives bands around median of width 1 std dev, 2 std dev, 3 std dev,
-## and then the long tail.
-NORMAL_HISTOGRAM_BPS = (0, 668, 1587, 3085, 5000, 6915, 8413, 9332, 10000)
-
-DEFAULT_SIZE_GUIDANCE = {
- COMPRESSED_HISTOGRAMS: 500,
- IMAGES: 4,
- AUDIO: 4,
- SCALARS: 10000,
- # We store this many health pills per op.
- HEALTH_PILLS: 100,
- HISTOGRAMS: 1,
- TENSORS: 10,
-}
-
-STORE_EVERYTHING_SIZE_GUIDANCE = {
- COMPRESSED_HISTOGRAMS: 0,
- IMAGES: 0,
- AUDIO: 0,
- SCALARS: 0,
- HEALTH_PILLS: 0,
- HISTOGRAMS: 0,
- TENSORS: 0,
-}
-
-# The tag that values containing health pills have. Health pill data is stored
-# in tensors. In order to distinguish health pill values from scalar values, we
-# rely on how health pill values have this special tag value.
-_HEALTH_PILL_EVENT_TAG = '__health_pill__'
-
-
-def IsTensorFlowEventsFile(path):
- """Check the path name to see if it is probably a TF Events file.
-
- Args:
- path: A file path to check if it is an event file.
-
- Raises:
- ValueError: If the path is an empty string.
-
- Returns:
- If path is formatted like a TensorFlowEventsFile.
- """
- if not path:
- raise ValueError('Path must be a nonempty string')
- return 'tfevents' in compat.as_str_any(os.path.basename(path))
-
-
-class EventAccumulator(object):
- """An `EventAccumulator` takes an event generator, and accumulates the values.
-
- The `EventAccumulator` is intended to provide a convenient Python interface
- for loading Event data written during a TensorFlow run. TensorFlow writes out
- `Event` protobuf objects, which have a timestamp and step number, and often
- contain a `Summary`. Summaries can have different kinds of data like an image,
- a scalar value, or a histogram. The Summaries also have a tag, which we use to
- organize logically related data. The `EventAccumulator` supports retrieving
- the `Event` and `Summary` data by its tag.
-
- Calling `Tags()` gets a map from `tagType` (e.g. `'images'`,
- `'compressedHistograms'`, `'scalars'`, etc) to the associated tags for those
- data types. Then, various functional endpoints (eg
- `Accumulator.Scalars(tag)`) allow for the retrieval of all data
- associated with that tag.
-
- The `Reload()` method synchronously loads all of the data written so far.
-
- Histograms, audio, and images are very large, so storing all of them is not
- recommended.
- @@Tensors
- """
-
- def __init__(self,
- path,
- size_guidance=DEFAULT_SIZE_GUIDANCE,
- compression_bps=NORMAL_HISTOGRAM_BPS,
- purge_orphaned_data=True):
- """Construct the `EventAccumulator`.
-
- Args:
- path: A file path to a directory containing tf events files, or a single
- tf events file. The accumulator will load events from this path.
- size_guidance: Information on how much data the EventAccumulator should
- store in memory. The DEFAULT_SIZE_GUIDANCE tries not to store too much
- so as to avoid OOMing the client. The size_guidance should be a map
- from a `tagType` string to an integer representing the number of
- items to keep per tag for items of that `tagType`. If the size is 0,
- all events are stored.
- compression_bps: Information on how the `EventAccumulator` should compress
- histogram data for the `CompressedHistograms` tag (for details see
- `ProcessCompressedHistogram`).
- purge_orphaned_data: Whether to discard any events that were "orphaned" by
- a TensorFlow restart.
- """
- sizes = {}
- for key in DEFAULT_SIZE_GUIDANCE:
- if key in size_guidance:
- sizes[key] = size_guidance[key]
- else:
- sizes[key] = DEFAULT_SIZE_GUIDANCE[key]
-
- self._first_event_timestamp = None
- self._scalars = reservoir.Reservoir(size=sizes[SCALARS])
-
- # Unlike the other reservoir, the reservoir for health pills is keyed by the
- # name of the op instead of the tag. This lets us efficiently obtain the
- # health pills per node.
- self._health_pills = reservoir.Reservoir(size=sizes[HEALTH_PILLS])
-
- self._graph = None
- self._graph_from_metagraph = False
- self._meta_graph = None
- self._tagged_metadata = {}
- self._histograms = reservoir.Reservoir(size=sizes[HISTOGRAMS])
- self._compressed_histograms = reservoir.Reservoir(
- size=sizes[COMPRESSED_HISTOGRAMS], always_keep_last=False)
- self._images = reservoir.Reservoir(size=sizes[IMAGES])
- self._audio = reservoir.Reservoir(size=sizes[AUDIO])
- self._tensors = reservoir.Reservoir(size=sizes[TENSORS])
-
- self._generator_mutex = threading.Lock()
- self._generator = _GeneratorFromPath(path)
-
- self._compression_bps = compression_bps
- self.purge_orphaned_data = purge_orphaned_data
-
- self.most_recent_step = -1
- self.most_recent_wall_time = -1
- self.file_version = None
-
- # The attributes that get built up by the accumulator
- self.accumulated_attrs = ('_scalars', '_histograms',
- '_compressed_histograms', '_images', '_audio')
- self._tensor_summaries = {}
-
- def Reload(self):
- """Loads all events added since the last call to `Reload`.
-
- If `Reload` was never called, loads all events in the file.
-
- Returns:
- The `EventAccumulator`.
- """
- with self._generator_mutex:
- for event in self._generator.Load():
- self._ProcessEvent(event)
- return self
-
- def FirstEventTimestamp(self):
- """Returns the timestamp in seconds of the first event.
-
- If the first event has been loaded (either by this method or by `Reload`,
- this returns immediately. Otherwise, it will load in the first event. Note
- that this means that calling `Reload` will cause this to block until
- `Reload` has finished.
-
- Returns:
- The timestamp in seconds of the first event that was loaded.
-
- Raises:
- ValueError: If no events have been loaded and there were no events found
- on disk.
- """
- if self._first_event_timestamp is not None:
- return self._first_event_timestamp
- with self._generator_mutex:
- try:
- event = next(self._generator.Load())
- self._ProcessEvent(event)
- return self._first_event_timestamp
-
- except StopIteration:
- raise ValueError('No event timestamp could be found')
-
- def _ProcessEvent(self, event):
- """Called whenever an event is loaded."""
- if self._first_event_timestamp is None:
- self._first_event_timestamp = event.wall_time
-
- if event.HasField('file_version'):
- new_file_version = _ParseFileVersion(event.file_version)
- if self.file_version and self.file_version != new_file_version:
- ## This should not happen.
- logging.warn(('Found new file_version for event.proto. This will '
- 'affect purging logic for TensorFlow restarts. '
- 'Old: {0} New: {1}').format(self.file_version,
- new_file_version))
- self.file_version = new_file_version
-
- self._MaybePurgeOrphanedData(event)
-
- ## Process the event.
- # GraphDef and MetaGraphDef are handled in a special way:
- # If no graph_def Event is available, but a meta_graph_def is, and it
- # contains a graph_def, then use the meta_graph_def.graph_def as our graph.
- # If a graph_def Event is available, always prefer it to the graph_def
- # inside the meta_graph_def.
- if event.HasField('graph_def'):
- if self._graph is not None:
- logging.warn(('Found more than one graph event per run, or there was '
- 'a metagraph containing a graph_def, as well as one or '
- 'more graph events. Overwriting the graph with the '
- 'newest event.'))
- self._graph = event.graph_def
- self._graph_from_metagraph = False
- elif event.HasField('meta_graph_def'):
- if self._meta_graph is not None:
- logging.warn(('Found more than one metagraph event per run. '
- 'Overwriting the metagraph with the newest event.'))
- self._meta_graph = event.meta_graph_def
- if self._graph is None or self._graph_from_metagraph:
- # We may have a graph_def in the metagraph. If so, and no
- # graph_def is directly available, use this one instead.
- meta_graph = meta_graph_pb2.MetaGraphDef()
- meta_graph.ParseFromString(self._meta_graph)
- if meta_graph.graph_def:
- if self._graph is not None:
- logging.warn(('Found multiple metagraphs containing graph_defs,'
- 'but did not find any graph events. Overwriting the '
- 'graph with the newest metagraph version.'))
- self._graph_from_metagraph = True
- self._graph = meta_graph.graph_def.SerializeToString()
- elif event.HasField('tagged_run_metadata'):
- tag = event.tagged_run_metadata.tag
- if tag in self._tagged_metadata:
- logging.warn('Found more than one "run metadata" event with tag ' +
- tag + '. Overwriting it with the newest event.')
- self._tagged_metadata[tag] = event.tagged_run_metadata.run_metadata
- elif event.HasField('summary'):
- for value in event.summary.value:
- if value.HasField('tensor') and value.tag == _HEALTH_PILL_EVENT_TAG:
- self._ProcessHealthPillSummary(value, event)
- else:
- for summary_type, summary_func in SUMMARY_TYPES.items():
- if value.HasField(summary_type):
- datum = getattr(value, summary_type)
- tag = value.node_name if summary_type == 'tensor' else value.tag
- getattr(self, summary_func)(tag, event.wall_time, event.step,
- datum)
-
- def _ProcessHealthPillSummary(self, value, event):
- """Process summaries containing health pills.
-
- These summaries are distinguished by the fact that they have a Tensor field
- and have a special tag value.
-
- This method emits ERROR-level messages to the logs if it encounters Tensor
- summaries that it cannot process.
-
- Args:
- value: A summary_pb2.Summary.Value with a Tensor field.
- event: The event_pb2.Event containing that value.
- """
- elements = np.fromstring(value.tensor.tensor_content, dtype=np.float64)
-
- # The node_name property of the value object is actually a watch key: a
- # combination of node name, output slot, and a suffix. We capture the
- # actual node name and the output slot with a regular expression.
- match = re.match(r'^(.*):(\d+):DebugNumericSummary$', value.node_name)
- if not match:
- logging.log_first_n(
- logging.ERROR,
- 'Unsupported watch key %s for health pills; skipping this sequence.',
- 1,
- value.node_name)
- return
-
- node_name = match.group(1)
- output_slot = int(match.group(2))
- self._ProcessHealthPill(
- event.wall_time, event.step, node_name, output_slot, elements)
-
- def Tags(self):
- """Return all tags found in the value stream.
-
- Returns:
- A `{tagType: ['list', 'of', 'tags']}` dictionary.
- """
- return {
- IMAGES: self._images.Keys(),
- AUDIO: self._audio.Keys(),
- HISTOGRAMS: self._histograms.Keys(),
- SCALARS: self._scalars.Keys(),
- COMPRESSED_HISTOGRAMS: self._compressed_histograms.Keys(),
- TENSORS: self._tensors.Keys(),
- # Use a heuristic: if the metagraph is available, but
- # graph is not, then we assume the metagraph contains the graph.
- GRAPH: self._graph is not None,
- META_GRAPH: self._meta_graph is not None,
- RUN_METADATA: list(self._tagged_metadata.keys())
- }
-
- def Scalars(self, tag):
- """Given a summary tag, return all associated `ScalarEvent`s.
-
- Args:
- tag: A string tag associated with the events.
-
- Raises:
- KeyError: If the tag is not found.
-
- Returns:
- An array of `ScalarEvent`s.
- """
- return self._scalars.Items(tag)
-
- def HealthPills(self, node_name):
- """Returns all health pill values for a certain node.
-
- Args:
- node_name: The name of the node to obtain health pills for.
-
- Raises:
- KeyError: If the node name is not found.
-
- Returns:
- An array of `HealthPillEvent`s.
- """
- return self._health_pills.Items(node_name)
-
- def Graph(self):
- """Return the graph definition, if there is one.
-
- If the graph is stored directly, return that. If no graph is stored
- directly but a metagraph is stored containing a graph, return that.
-
- Raises:
- ValueError: If there is no graph for this run.
-
- Returns:
- The `graph_def` proto.
- """
- graph = graph_pb2.GraphDef()
- if self._graph is not None:
- graph.ParseFromString(self._graph)
- return graph
- raise ValueError('There is no graph in this EventAccumulator')
-
- def MetaGraph(self):
- """Return the metagraph definition, if there is one.
-
- Raises:
- ValueError: If there is no metagraph for this run.
-
- Returns:
- The `meta_graph_def` proto.
- """
- if self._meta_graph is None:
- raise ValueError('There is no metagraph in this EventAccumulator')
- meta_graph = meta_graph_pb2.MetaGraphDef()
- meta_graph.ParseFromString(self._meta_graph)
- return meta_graph
-
- def RunMetadata(self, tag):
- """Given a tag, return the associated session.run() metadata.
-
- Args:
- tag: A string tag associated with the event.
-
- Raises:
- ValueError: If the tag is not found.
-
- Returns:
- The metadata in form of `RunMetadata` proto.
- """
- if tag not in self._tagged_metadata:
- raise ValueError('There is no run metadata with this tag name')
-
- run_metadata = RunMetadata()
- run_metadata.ParseFromString(self._tagged_metadata[tag])
- return run_metadata
-
- def Histograms(self, tag):
- """Given a summary tag, return all associated histograms.
-
- Args:
- tag: A string tag associated with the events.
-
- Raises:
- KeyError: If the tag is not found.
-
- Returns:
- An array of `HistogramEvent`s.
- """
- return self._histograms.Items(tag)
-
- def CompressedHistograms(self, tag):
- """Given a summary tag, return all associated compressed histograms.
-
- Args:
- tag: A string tag associated with the events.
-
- Raises:
- KeyError: If the tag is not found.
-
- Returns:
- An array of `CompressedHistogramEvent`s.
- """
- return self._compressed_histograms.Items(tag)
-
- def Images(self, tag):
- """Given a summary tag, return all associated images.
-
- Args:
- tag: A string tag associated with the events.
-
- Raises:
- KeyError: If the tag is not found.
-
- Returns:
- An array of `ImageEvent`s.
- """
- return self._images.Items(tag)
-
- def Audio(self, tag):
- """Given a summary tag, return all associated audio.
-
- Args:
- tag: A string tag associated with the events.
-
- Raises:
- KeyError: If the tag is not found.
-
- Returns:
- An array of `AudioEvent`s.
- """
- return self._audio.Items(tag)
-
- def Tensors(self, tag):
- """Given a summary tag, return all associated tensors.
-
- Args:
- tag: A string tag associated with the events.
-
- Raises:
- KeyError: If the tag is not found.
-
- Returns:
- An array of `TensorEvent`s.
- """
- return self._tensors.Items(tag)
-
- def _MaybePurgeOrphanedData(self, event):
- """Maybe purge orphaned data due to a TensorFlow crash.
-
- When TensorFlow crashes at step T+O and restarts at step T, any events
- written after step T are now "orphaned" and will be at best misleading if
- they are included in TensorBoard.
-
- This logic attempts to determine if there is orphaned data, and purge it
- if it is found.
-
- Args:
- event: The event to use as a reference, to determine if a purge is needed.
- """
- if not self.purge_orphaned_data:
- return
- ## Check if the event happened after a crash, and purge expired tags.
- if self.file_version and self.file_version >= 2:
- ## If the file_version is recent enough, use the SessionLog enum
- ## to check for restarts.
- self._CheckForRestartAndMaybePurge(event)
- else:
- ## If there is no file version, default to old logic of checking for
- ## out of order steps.
- self._CheckForOutOfOrderStepAndMaybePurge(event)
-
- def _CheckForRestartAndMaybePurge(self, event):
- """Check and discard expired events using SessionLog.START.
-
- Check for a SessionLog.START event and purge all previously seen events
- with larger steps, because they are out of date. Because of supervisor
- threading, it is possible that this logic will cause the first few event
- messages to be discarded since supervisor threading does not guarantee
- that the START message is deterministically written first.
-
- This method is preferred over _CheckForOutOfOrderStepAndMaybePurge which
- can inadvertently discard events due to supervisor threading.
-
- Args:
- event: The event to use as reference. If the event is a START event, all
- previously seen events with a greater event.step will be purged.
- """
- if event.HasField(
- 'session_log') and event.session_log.status == SessionLog.START:
- self._Purge(event, by_tags=False)
-
- def _CheckForOutOfOrderStepAndMaybePurge(self, event):
- """Check for out-of-order event.step and discard expired events for tags.
-
- Check if the event is out of order relative to the global most recent step.
- If it is, purge outdated summaries for tags that the event contains.
-
- Args:
- event: The event to use as reference. If the event is out-of-order, all
- events with the same tags, but with a greater event.step will be purged.
- """
- if event.step < self.most_recent_step and event.HasField('summary'):
- self._Purge(event, by_tags=True)
- else:
- self.most_recent_step = event.step
- self.most_recent_wall_time = event.wall_time
-
- def _ConvertHistogramProtoToTuple(self, histo):
- return HistogramValue(min=histo.min,
- max=histo.max,
- num=histo.num,
- sum=histo.sum,
- sum_squares=histo.sum_squares,
- bucket_limit=list(histo.bucket_limit),
- bucket=list(histo.bucket))
-
- def _ProcessHistogram(self, tag, wall_time, step, histo):
- """Processes a proto histogram by adding it to accumulated state."""
- histo = self._ConvertHistogramProtoToTuple(histo)
- histo_ev = HistogramEvent(wall_time, step, histo)
- self._histograms.AddItem(tag, histo_ev)
- self._compressed_histograms.AddItem(
- tag, histo_ev, lambda x: _CompressHistogram(x, self._compression_bps))
-
- def _ProcessImage(self, tag, wall_time, step, image):
- """Processes an image by adding it to accumulated state."""
- event = ImageEvent(wall_time=wall_time,
- step=step,
- encoded_image_string=image.encoded_image_string,
- width=image.width,
- height=image.height)
- self._images.AddItem(tag, event)
-
- def _ProcessAudio(self, tag, wall_time, step, audio):
- """Processes a audio by adding it to accumulated state."""
- event = AudioEvent(wall_time=wall_time,
- step=step,
- encoded_audio_string=audio.encoded_audio_string,
- content_type=audio.content_type,
- sample_rate=audio.sample_rate,
- length_frames=audio.length_frames)
- self._audio.AddItem(tag, event)
-
- def _ProcessScalar(self, tag, wall_time, step, scalar):
- """Processes a simple value by adding it to accumulated state."""
- sv = ScalarEvent(wall_time=wall_time, step=step, value=scalar)
- self._scalars.AddItem(tag, sv)
-
- def _ProcessTensor(self, tag, wall_time, step, tensor):
- tv = TensorEvent(wall_time=wall_time, step=step, tensor_proto=tensor)
- self._tensors.AddItem(tag, tv)
-
- def _ProcessHealthPill(self, wall_time, step, node_name, output_slot,
- elements):
- """Processes a health pill value by adding it to accumulated state.
-
- Args:
- wall_time: The time at which the health pill was created. Provided by the
- debugger.
- step: The step at which the health pill was created. Provided by the
- debugger.
- node_name: The name of the node for this health pill.
- output_slot: The output slot for this health pill.
- elements: An ND array of 12 floats. The elements of the health pill.
- """
- # Key by the node name for fast retrieval of health pills by node name. The
- # array is cast to a list so that it is JSON-able. The debugger data plugin
- # serves a JSON response.
- self._health_pills.AddItem(
- node_name,
- HealthPillEvent(
- wall_time=wall_time,
- step=step,
- node_name=node_name,
- output_slot=output_slot,
- value=list(elements)))
-
- def _Purge(self, event, by_tags):
- """Purge all events that have occurred after the given event.step.
-
- If by_tags is True, purge all events that occurred after the given
- event.step, but only for the tags that the event has. Non-sequential
- event.steps suggest that a TensorFlow restart occurred, and we discard
- the out-of-order events to display a consistent view in TensorBoard.
-
- Discarding by tags is the safer method, when we are unsure whether a restart
- has occurred, given that threading in supervisor can cause events of
- different tags to arrive with unsynchronized step values.
-
- If by_tags is False, then purge all events with event.step greater than the
- given event.step. This can be used when we are certain that a TensorFlow
- restart has occurred and these events can be discarded.
-
- Args:
- event: The event to use as reference for the purge. All events with
- the same tags, but with a greater event.step will be purged.
- by_tags: Bool to dictate whether to discard all out-of-order events or
- only those that are associated with the given reference event.
- """
- ## Keep data in reservoirs that has a step less than event.step
- _NotExpired = lambda x: x.step < event.step
-
- if by_tags:
-
- def _ExpiredPerTag(value):
- return [getattr(self, x).FilterItems(_NotExpired, value.tag)
- for x in self.accumulated_attrs]
-
- expired_per_tags = [_ExpiredPerTag(value)
- for value in event.summary.value]
- expired_per_type = [sum(x) for x in zip(*expired_per_tags)]
- else:
- expired_per_type = [getattr(self, x).FilterItems(_NotExpired)
- for x in self.accumulated_attrs]
-
- if sum(expired_per_type) > 0:
- purge_msg = _GetPurgeMessage(self.most_recent_step,
- self.most_recent_wall_time, event.step,
- event.wall_time, *expired_per_type)
- logging.warn(purge_msg)
-
-
-def _GetPurgeMessage(most_recent_step, most_recent_wall_time, event_step,
- event_wall_time, num_expired_scalars, num_expired_histos,
- num_expired_comp_histos, num_expired_images,
- num_expired_audio):
- """Return the string message associated with TensorBoard purges."""
- return ('Detected out of order event.step likely caused by '
- 'a TensorFlow restart. Purging expired events from Tensorboard'
- ' display between the previous step: {} (timestamp: {}) and '
- 'current step: {} (timestamp: {}). Removing {} scalars, {} '
- 'histograms, {} compressed histograms, {} images, '
- 'and {} audio.').format(most_recent_step, most_recent_wall_time,
- event_step, event_wall_time,
- num_expired_scalars, num_expired_histos,
- num_expired_comp_histos, num_expired_images,
- num_expired_audio)
-
-
-def _GeneratorFromPath(path):
- """Create an event generator for file or directory at given path string."""
- if not path:
- raise ValueError('path must be a valid string')
- if IsTensorFlowEventsFile(path):
- return event_file_loader.EventFileLoader(path)
- else:
- return directory_watcher.DirectoryWatcher(
- path, event_file_loader.EventFileLoader, IsTensorFlowEventsFile)
-
-
-def _ParseFileVersion(file_version):
- """Convert the string file_version in event.proto into a float.
-
- Args:
- file_version: String file_version from event.proto
-
- Returns:
- Version number as a float.
- """
- tokens = file_version.split('brain.Event:')
- try:
- return float(tokens[-1])
- except ValueError:
- ## This should never happen according to the definition of file_version
- ## specified in event.proto.
- logging.warn(('Invalid event.proto file_version. Defaulting to use of '
- 'out-of-order event.step logic for purging expired events.'))
- return -1
-
-
-def _CompressHistogram(histo_ev, bps):
- """Creates fixed size histogram by adding compression to accumulated state.
-
- This routine transforms a histogram at a particular step by linearly
- interpolating its variable number of buckets to represent their cumulative
- weight at a constant number of compression points. This significantly reduces
- the size of the histogram and makes it suitable for a two-dimensional area
- plot where the output of this routine constitutes the ranges for a single x
- coordinate.
-
- Args:
- histo_ev: A HistogramEvent namedtuple.
- bps: Compression points represented in basis points, 1/100ths of a percent.
-
- Returns:
- CompressedHistogramEvent namedtuple.
- """
- # See also: Histogram::Percentile() in core/lib/histogram/histogram.cc
- histo = histo_ev.histogram_value
- if not histo.num:
- return CompressedHistogramEvent(
- histo_ev.wall_time,
- histo_ev.step,
- [CompressedHistogramValue(b, 0.0) for b in bps])
- bucket = np.array(histo.bucket)
- weights = (bucket * bps[-1] / (bucket.sum() or 1.0)).cumsum()
- values = []
- j = 0
- while j < len(bps):
- i = np.searchsorted(weights, bps[j], side='right')
- while i < len(weights):
- cumsum = weights[i]
- cumsum_prev = weights[i - 1] if i > 0 else 0.0
- if cumsum == cumsum_prev: # prevent remap divide by zero
- i += 1
- continue
- if not i or not cumsum_prev:
- lhs = histo.min
- else:
- lhs = max(histo.bucket_limit[i - 1], histo.min)
- rhs = min(histo.bucket_limit[i], histo.max)
- weight = _Remap(bps[j], cumsum_prev, cumsum, lhs, rhs)
- values.append(CompressedHistogramValue(bps[j], weight))
- j += 1
- break
- else:
- break
- while j < len(bps):
- values.append(CompressedHistogramValue(bps[j], histo.max))
- j += 1
- return CompressedHistogramEvent(histo_ev.wall_time, histo_ev.step, values)
-
-
-def _Remap(x, x0, x1, y0, y1):
- """Linearly map from [x0, x1] unto [y0, y1]."""
- return y0 + (x - x0) * float(y1 - y0) / (x1 - x0)
diff --git a/tensorflow/python/summary/event_accumulator_test.py b/tensorflow/python/summary/event_accumulator_test.py
deleted file mode 100644
index 0577486df5..0000000000
--- a/tensorflow/python/summary/event_accumulator_test.py
+++ /dev/null
@@ -1,980 +0,0 @@
-# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ==============================================================================
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-import os
-
-import numpy as np
-import six
-from six.moves import xrange # pylint: disable=redefined-builtin
-
-from tensorflow.core.framework import graph_pb2
-from tensorflow.core.framework import summary_pb2
-from tensorflow.core.protobuf import config_pb2
-from tensorflow.core.util import event_pb2
-from tensorflow.python.framework import constant_op
-from tensorflow.python.framework import dtypes
-from tensorflow.python.framework import ops
-from tensorflow.python.framework import tensor_util
-from tensorflow.python.ops import array_ops
-from tensorflow.python.platform import gfile
-from tensorflow.python.platform import googletest
-from tensorflow.python.platform import test
-from tensorflow.python.platform import tf_logging as logging
-from tensorflow.python.summary import event_accumulator as ea
-from tensorflow.python.summary import summary as summary_lib
-from tensorflow.python.summary.writer import writer as writer_lib
-from tensorflow.python.summary.writer.writer import SummaryToEventTransformer
-from tensorflow.python.training import saver
-
-
-class _EventGenerator(object):
- """Class that can add_events and then yield them back.
-
- Satisfies the EventGenerator API required for the EventAccumulator.
- Satisfies the EventWriter API required to create a SummaryWriter.
-
- Has additional convenience methods for adding test events.
- """
-
- def __init__(self, testcase, zero_out_timestamps=False):
- self._testcase = testcase
- self.items = []
- self.zero_out_timestamps = zero_out_timestamps
-
- def Load(self):
- while self.items:
- yield self.items.pop(0)
-
- def AddScalar(self, tag, wall_time=0, step=0, value=0):
- event = event_pb2.Event(
- wall_time=wall_time,
- step=step,
- summary=summary_pb2.Summary(
- value=[summary_pb2.Summary.Value(
- tag=tag, simple_value=value)]))
- self.AddEvent(event)
-
- def AddHealthPill(self, wall_time, step, node_name, output_slot, elements):
- event = event_pb2.Event()
- event.wall_time = wall_time
- event.step = step
- value = event.summary.value.add()
- # The node_name property is actually a watch key.
- value.node_name = '%s:%d:DebugNumericSummary' % (node_name, output_slot)
- value.tag = '__health_pill__'
- value.tensor.tensor_shape.dim.add().size = len(elements)
- value.tensor.tensor_content = np.array(elements, dtype=np.float64).tobytes()
- self.AddEvent(event)
-
- def AddHistogram(self,
- tag,
- wall_time=0,
- step=0,
- hmin=1,
- hmax=2,
- hnum=3,
- hsum=4,
- hsum_squares=5,
- hbucket_limit=None,
- hbucket=None):
- histo = summary_pb2.HistogramProto(
- min=hmin,
- max=hmax,
- num=hnum,
- sum=hsum,
- sum_squares=hsum_squares,
- bucket_limit=hbucket_limit,
- bucket=hbucket)
- event = event_pb2.Event(
- wall_time=wall_time,
- step=step,
- summary=summary_pb2.Summary(
- value=[summary_pb2.Summary.Value(
- tag=tag, histo=histo)]))
- self.AddEvent(event)
-
- def AddImage(self,
- tag,
- wall_time=0,
- step=0,
- encoded_image_string=b'imgstr',
- width=150,
- height=100):
- image = summary_pb2.Summary.Image(
- encoded_image_string=encoded_image_string, width=width, height=height)
- event = event_pb2.Event(
- wall_time=wall_time,
- step=step,
- summary=summary_pb2.Summary(
- value=[summary_pb2.Summary.Value(
- tag=tag, image=image)]))
- self.AddEvent(event)
-
- def AddAudio(self,
- tag,
- wall_time=0,
- step=0,
- encoded_audio_string=b'sndstr',
- content_type='audio/wav',
- sample_rate=44100,
- length_frames=22050):
- audio = summary_pb2.Summary.Audio(
- encoded_audio_string=encoded_audio_string,
- content_type=content_type,
- sample_rate=sample_rate,
- length_frames=length_frames)
- event = event_pb2.Event(
- wall_time=wall_time,
- step=step,
- summary=summary_pb2.Summary(
- value=[summary_pb2.Summary.Value(
- tag=tag, audio=audio)]))
- self.AddEvent(event)
-
- def AddEvent(self, event):
- if self.zero_out_timestamps:
- event.wall_time = 0
- self.items.append(event)
-
- def add_event(self, event): # pylint: disable=invalid-name
- """Match the EventWriter API."""
- self.AddEvent(event)
-
- def get_logdir(self): # pylint: disable=invalid-name
- """Return a temp directory for asset writing."""
- return self._testcase.get_temp_dir()
-
-
-class EventAccumulatorTest(test.TestCase):
-
- def assertTagsEqual(self, actual, expected):
- """Utility method for checking the return value of the Tags() call.
-
- It fills out the `expected` arg with the default (empty) values for every
- tag type, so that the author needs only specify the non-empty values they
- are interested in testing.
-
- Args:
- actual: The actual Accumulator tags response.
- expected: The expected tags response (empty fields may be omitted)
- """
-
- empty_tags = {
- ea.IMAGES: [],
- ea.AUDIO: [],
- ea.SCALARS: [],
- ea.HISTOGRAMS: [],
- ea.COMPRESSED_HISTOGRAMS: [],
- ea.GRAPH: False,
- ea.META_GRAPH: False,
- ea.RUN_METADATA: [],
- ea.TENSORS: [],
- }
-
- # Verifies that there are no unexpected keys in the actual response.
- # If this line fails, likely you added a new tag type, and need to update
- # the empty_tags dictionary above.
- self.assertItemsEqual(actual.keys(), empty_tags.keys())
-
- for key in actual:
- expected_value = expected.get(key, empty_tags[key])
- if isinstance(expected_value, list):
- self.assertItemsEqual(actual[key], expected_value)
- else:
- self.assertEqual(actual[key], expected_value)
-
-
-class MockingEventAccumulatorTest(EventAccumulatorTest):
-
- def setUp(self):
- super(MockingEventAccumulatorTest, self).setUp()
- self.stubs = googletest.StubOutForTesting()
- self._real_constructor = ea.EventAccumulator
- self._real_generator = ea._GeneratorFromPath
-
- def _FakeAccumulatorConstructor(generator, *args, **kwargs):
- ea._GeneratorFromPath = lambda x: generator
- return self._real_constructor(generator, *args, **kwargs)
-
- ea.EventAccumulator = _FakeAccumulatorConstructor
-
- def tearDown(self):
- self.stubs.CleanUp()
- ea.EventAccumulator = self._real_constructor
- ea._GeneratorFromPath = self._real_generator
-
- def testEmptyAccumulator(self):
- gen = _EventGenerator(self)
- x = ea.EventAccumulator(gen)
- x.Reload()
- self.assertTagsEqual(x.Tags(), {})
-
- def testTags(self):
- gen = _EventGenerator(self)
- gen.AddScalar('s1')
- gen.AddScalar('s2')
- gen.AddHistogram('hst1')
- gen.AddHistogram('hst2')
- gen.AddImage('im1')
- gen.AddImage('im2')
- gen.AddAudio('snd1')
- gen.AddAudio('snd2')
- acc = ea.EventAccumulator(gen)
- acc.Reload()
- self.assertTagsEqual(acc.Tags(), {
- ea.IMAGES: ['im1', 'im2'],
- ea.AUDIO: ['snd1', 'snd2'],
- ea.SCALARS: ['s1', 's2'],
- ea.HISTOGRAMS: ['hst1', 'hst2'],
- ea.COMPRESSED_HISTOGRAMS: ['hst1', 'hst2'],
- })
-
- def testReload(self):
- gen = _EventGenerator(self)
- acc = ea.EventAccumulator(gen)
- acc.Reload()
- self.assertTagsEqual(acc.Tags(), {})
- gen.AddScalar('s1')
- gen.AddScalar('s2')
- gen.AddHistogram('hst1')
- gen.AddHistogram('hst2')
- gen.AddImage('im1')
- gen.AddImage('im2')
- gen.AddAudio('snd1')
- gen.AddAudio('snd2')
- acc.Reload()
- self.assertTagsEqual(acc.Tags(), {
- ea.IMAGES: ['im1', 'im2'],
- ea.AUDIO: ['snd1', 'snd2'],
- ea.SCALARS: ['s1', 's2'],
- ea.HISTOGRAMS: ['hst1', 'hst2'],
- ea.COMPRESSED_HISTOGRAMS: ['hst1', 'hst2'],
- })
-
- def testScalars(self):
- gen = _EventGenerator(self)
- acc = ea.EventAccumulator(gen)
- s1 = ea.ScalarEvent(wall_time=1, step=10, value=32)
- s2 = ea.ScalarEvent(wall_time=2, step=12, value=64)
- gen.AddScalar('s1', wall_time=1, step=10, value=32)
- gen.AddScalar('s2', wall_time=2, step=12, value=64)
- acc.Reload()
- self.assertEqual(acc.Scalars('s1'), [s1])
- self.assertEqual(acc.Scalars('s2'), [s2])
-
- def _compareHealthPills(self, expected_event, gotten_event):
- """Compares 2 health pills.
-
- Args:
- expected_event: The expected HealthPillEvent.
- gotten_event: The gotten HealthPillEvent.
- """
- self.assertEqual(expected_event.wall_time, gotten_event.wall_time)
- self.assertEqual(expected_event.step, gotten_event.step)
- self.assertEqual(expected_event.node_name, gotten_event.node_name)
- self.assertEqual(expected_event.output_slot, gotten_event.output_slot)
- self.assertEqual(len(expected_event.value), len(gotten_event.value))
- for i, expected_value in enumerate(expected_event.value):
- self.assertEqual(expected_value, gotten_event.value[i])
-
- def testHealthPills(self):
- gen = _EventGenerator(self)
- acc = ea.EventAccumulator(gen)
- gen.AddHealthPill(13371337, 41, 'Add', 0, range(1, 13))
- gen.AddHealthPill(13381338, 42, 'Add', 1, range(42, 54))
-
- acc = ea.EventAccumulator(gen)
- acc.Reload()
-
- # Retrieve the health pills for each node name.
- gotten_events = acc.HealthPills('Add')
- self.assertEquals(2, len(gotten_events))
- self._compareHealthPills(
- ea.HealthPillEvent(
- wall_time=13371337,
- step=41,
- node_name='Add',
- output_slot=0,
- value=range(1, 13)),
- gotten_events[0])
- self._compareHealthPills(
- ea.HealthPillEvent(
- wall_time=13381338,
- step=42,
- node_name='Add',
- output_slot=1,
- value=range(42, 54)),
- gotten_events[1])
-
- def testHistograms(self):
- gen = _EventGenerator(self)
- acc = ea.EventAccumulator(gen)
-
- val1 = ea.HistogramValue(
- min=1,
- max=2,
- num=3,
- sum=4,
- sum_squares=5,
- bucket_limit=[1, 2, 3],
- bucket=[0, 3, 0])
- val2 = ea.HistogramValue(
- min=-2,
- max=3,
- num=4,
- sum=5,
- sum_squares=6,
- bucket_limit=[2, 3, 4],
- bucket=[1, 3, 0])
-
- hst1 = ea.HistogramEvent(wall_time=1, step=10, histogram_value=val1)
- hst2 = ea.HistogramEvent(wall_time=2, step=12, histogram_value=val2)
- gen.AddHistogram(
- 'hst1',
- wall_time=1,
- step=10,
- hmin=1,
- hmax=2,
- hnum=3,
- hsum=4,
- hsum_squares=5,
- hbucket_limit=[1, 2, 3],
- hbucket=[0, 3, 0])
- gen.AddHistogram(
- 'hst2',
- wall_time=2,
- step=12,
- hmin=-2,
- hmax=3,
- hnum=4,
- hsum=5,
- hsum_squares=6,
- hbucket_limit=[2, 3, 4],
- hbucket=[1, 3, 0])
- acc.Reload()
- self.assertEqual(acc.Histograms('hst1'), [hst1])
- self.assertEqual(acc.Histograms('hst2'), [hst2])
-
- def testCompressedHistograms(self):
- gen = _EventGenerator(self)
- acc = ea.EventAccumulator(gen, compression_bps=(0, 2500, 5000, 7500, 10000))
-
- gen.AddHistogram(
- 'hst1',
- wall_time=1,
- step=10,
- hmin=1,
- hmax=2,
- hnum=3,
- hsum=4,
- hsum_squares=5,
- hbucket_limit=[1, 2, 3],
- hbucket=[0, 3, 0])
- gen.AddHistogram(
- 'hst2',
- wall_time=2,
- step=12,
- hmin=-2,
- hmax=3,
- hnum=4,
- hsum=5,
- hsum_squares=6,
- hbucket_limit=[2, 3, 4],
- hbucket=[1, 3, 0])
- acc.Reload()
-
- # Create the expected values after compressing hst1
- expected_vals1 = [
- ea.CompressedHistogramValue(bp, val)
- for bp, val in [(0, 1.0), (2500, 1.25), (5000, 1.5), (7500, 1.75
- ), (10000, 2.0)]
- ]
- expected_cmphst1 = ea.CompressedHistogramEvent(
- wall_time=1, step=10, compressed_histogram_values=expected_vals1)
- self.assertEqual(acc.CompressedHistograms('hst1'), [expected_cmphst1])
-
- # Create the expected values after compressing hst2
- expected_vals2 = [
- ea.CompressedHistogramValue(bp, val)
- for bp, val in [(0, -2),
- (2500, 2),
- (5000, 2 + 1 / 3),
- (7500, 2 + 2 / 3),
- (10000, 3)]
- ]
- expected_cmphst2 = ea.CompressedHistogramEvent(
- wall_time=2, step=12, compressed_histogram_values=expected_vals2)
- self.assertEqual(acc.CompressedHistograms('hst2'), [expected_cmphst2])
-
- def testCompressedHistogramsWithEmptyHistogram(self):
- gen = _EventGenerator(self)
- acc = ea.EventAccumulator(gen, compression_bps=(0, 2500, 5000, 7500, 10000))
-
- gen.AddHistogram(
- 'hst1',
- wall_time=1,
- step=10,
- hmin=None,
- hmax=None,
- hnum=0,
- hsum=0,
- hsum_squares=0,
- hbucket_limit=[1, 2, 3],
- hbucket=[0, 0, 0])
- acc.Reload()
-
- # Create the expected values after compressing hst1
- expected_vals1 = [
- ea.CompressedHistogramValue(bp, val)
- for bp, val in [(0, 0.0), (2500, 0), (5000, 0), (7500, 0), (10000, 0)]
- ]
- expected_cmphst1 = ea.CompressedHistogramEvent(
- wall_time=1, step=10, compressed_histogram_values=expected_vals1)
- self.assertEqual(acc.CompressedHistograms('hst1'), [expected_cmphst1])
-
- def testCompressHistogram_uglyHistogram(self):
- bps = (0, 668, 1587, 3085, 5000, 6915, 8413, 9332, 10000)
- histogram_values = ea.HistogramValue(
- min=0.0,
- max=1.0,
- num=960.0,
- sum=64.0,
- sum_squares=64.0,
- bucket_limit=[
- 0.0, 1e-12, 0.917246389039776, 1.0089710279437536,
- 1.7976931348623157e+308
- ],
- bucket=[0.0, 896.0, 0.0, 64.0, 0.0])
- histogram_event = ea.HistogramEvent(0, 0, histogram_values)
- compressed_event = ea._CompressHistogram(histogram_event, bps)
- vals = compressed_event.compressed_histogram_values
- self.assertEquals(tuple(v.basis_point for v in vals), bps)
- self.assertAlmostEqual(vals[0].value, 0.0)
- self.assertAlmostEqual(vals[1].value, 7.157142857142856e-14)
- self.assertAlmostEqual(vals[2].value, 1.7003571428571426e-13)
- self.assertAlmostEqual(vals[3].value, 3.305357142857143e-13)
- self.assertAlmostEqual(vals[4].value, 5.357142857142857e-13)
- self.assertAlmostEqual(vals[5].value, 7.408928571428571e-13)
- self.assertAlmostEqual(vals[6].value, 9.013928571428571e-13)
- self.assertAlmostEqual(vals[7].value, 9.998571428571429e-13)
- self.assertAlmostEqual(vals[8].value, 1.0)
-
- def testImages(self):
- gen = _EventGenerator(self)
- acc = ea.EventAccumulator(gen)
- im1 = ea.ImageEvent(
- wall_time=1,
- step=10,
- encoded_image_string=b'big',
- width=400,
- height=300)
- im2 = ea.ImageEvent(
- wall_time=2,
- step=12,
- encoded_image_string=b'small',
- width=40,
- height=30)
- gen.AddImage(
- 'im1',
- wall_time=1,
- step=10,
- encoded_image_string=b'big',
- width=400,
- height=300)
- gen.AddImage(
- 'im2',
- wall_time=2,
- step=12,
- encoded_image_string=b'small',
- width=40,
- height=30)
- acc.Reload()
- self.assertEqual(acc.Images('im1'), [im1])
- self.assertEqual(acc.Images('im2'), [im2])
-
- def testAudio(self):
- gen = _EventGenerator(self)
- acc = ea.EventAccumulator(gen)
- snd1 = ea.AudioEvent(
- wall_time=1,
- step=10,
- encoded_audio_string=b'big',
- content_type='audio/wav',
- sample_rate=44100,
- length_frames=441000)
- snd2 = ea.AudioEvent(
- wall_time=2,
- step=12,
- encoded_audio_string=b'small',
- content_type='audio/wav',
- sample_rate=44100,
- length_frames=44100)
- gen.AddAudio(
- 'snd1',
- wall_time=1,
- step=10,
- encoded_audio_string=b'big',
- content_type='audio/wav',
- sample_rate=44100,
- length_frames=441000)
- gen.AddAudio(
- 'snd2',
- wall_time=2,
- step=12,
- encoded_audio_string=b'small',
- content_type='audio/wav',
- sample_rate=44100,
- length_frames=44100)
- acc.Reload()
- self.assertEqual(acc.Audio('snd1'), [snd1])
- self.assertEqual(acc.Audio('snd2'), [snd2])
-
- def testKeyError(self):
- gen = _EventGenerator(self)
- acc = ea.EventAccumulator(gen)
- acc.Reload()
- with self.assertRaises(KeyError):
- acc.Scalars('s1')
- with self.assertRaises(KeyError):
- acc.Scalars('hst1')
- with self.assertRaises(KeyError):
- acc.Scalars('im1')
- with self.assertRaises(KeyError):
- acc.Histograms('s1')
- with self.assertRaises(KeyError):
- acc.Histograms('im1')
- with self.assertRaises(KeyError):
- acc.Images('s1')
- with self.assertRaises(KeyError):
- acc.Images('hst1')
- with self.assertRaises(KeyError):
- acc.Audio('s1')
- with self.assertRaises(KeyError):
- acc.Audio('hst1')
-
- def testNonValueEvents(self):
- """Tests that non-value events in the generator don't cause early exits."""
- gen = _EventGenerator(self)
- acc = ea.EventAccumulator(gen)
- gen.AddScalar('s1', wall_time=1, step=10, value=20)
- gen.AddEvent(event_pb2.Event(wall_time=2, step=20, file_version='nots2'))
- gen.AddScalar('s3', wall_time=3, step=100, value=1)
- gen.AddHistogram('hst1')
- gen.AddImage('im1')
- gen.AddAudio('snd1')
-
- acc.Reload()
- self.assertTagsEqual(acc.Tags(), {
- ea.IMAGES: ['im1'],
- ea.AUDIO: ['snd1'],
- ea.SCALARS: ['s1', 's3'],
- ea.HISTOGRAMS: ['hst1'],
- ea.COMPRESSED_HISTOGRAMS: ['hst1'],
- })
-
- def testExpiredDataDiscardedAfterRestartForFileVersionLessThan2(self):
- """Tests that events are discarded after a restart is detected.
-
- If a step value is observed to be lower than what was previously seen,
- this should force a discard of all previous items with the same tag
- that are outdated.
-
- Only file versions < 2 use this out-of-order discard logic. Later versions
- discard events based on the step value of SessionLog.START.
- """
- warnings = []
- self.stubs.Set(logging, 'warn', warnings.append)
-
- gen = _EventGenerator(self)
- acc = ea.EventAccumulator(gen)
-
- gen.AddEvent(
- event_pb2.Event(
- wall_time=0, step=0, file_version='brain.Event:1'))
- gen.AddScalar('s1', wall_time=1, step=100, value=20)
- gen.AddScalar('s1', wall_time=1, step=200, value=20)
- gen.AddScalar('s1', wall_time=1, step=300, value=20)
- acc.Reload()
- ## Check that number of items are what they should be
- self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 200, 300])
-
- gen.AddScalar('s1', wall_time=1, step=101, value=20)
- gen.AddScalar('s1', wall_time=1, step=201, value=20)
- gen.AddScalar('s1', wall_time=1, step=301, value=20)
- acc.Reload()
- ## Check that we have discarded 200 and 300 from s1
- self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 101, 201, 301])
-
- def testOrphanedDataNotDiscardedIfFlagUnset(self):
- """Tests that events are not discarded if purge_orphaned_data is false.
- """
- gen = _EventGenerator(self)
- acc = ea.EventAccumulator(gen, purge_orphaned_data=False)
-
- gen.AddEvent(
- event_pb2.Event(
- wall_time=0, step=0, file_version='brain.Event:1'))
- gen.AddScalar('s1', wall_time=1, step=100, value=20)
- gen.AddScalar('s1', wall_time=1, step=200, value=20)
- gen.AddScalar('s1', wall_time=1, step=300, value=20)
- acc.Reload()
- ## Check that number of items are what they should be
- self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 200, 300])
-
- gen.AddScalar('s1', wall_time=1, step=101, value=20)
- gen.AddScalar('s1', wall_time=1, step=201, value=20)
- gen.AddScalar('s1', wall_time=1, step=301, value=20)
- acc.Reload()
- ## Check that we have discarded 200 and 300 from s1
- self.assertEqual([x.step for x in acc.Scalars('s1')],
- [100, 200, 300, 101, 201, 301])
-
- def testEventsDiscardedPerTagAfterRestartForFileVersionLessThan2(self):
- """Tests that event discards after restart, only affect the misordered tag.
-
- If a step value is observed to be lower than what was previously seen,
- this should force a discard of all previous items that are outdated, but
- only for the out of order tag. Other tags should remain unaffected.
-
- Only file versions < 2 use this out-of-order discard logic. Later versions
- discard events based on the step value of SessionLog.START.
- """
- warnings = []
- self.stubs.Set(logging, 'warn', warnings.append)
-
- gen = _EventGenerator(self)
- acc = ea.EventAccumulator(gen)
-
- gen.AddEvent(
- event_pb2.Event(
- wall_time=0, step=0, file_version='brain.Event:1'))
- gen.AddScalar('s1', wall_time=1, step=100, value=20)
- gen.AddScalar('s1', wall_time=1, step=200, value=20)
- gen.AddScalar('s1', wall_time=1, step=300, value=20)
- gen.AddScalar('s1', wall_time=1, step=101, value=20)
- gen.AddScalar('s1', wall_time=1, step=201, value=20)
- gen.AddScalar('s1', wall_time=1, step=301, value=20)
-
- gen.AddScalar('s2', wall_time=1, step=101, value=20)
- gen.AddScalar('s2', wall_time=1, step=201, value=20)
- gen.AddScalar('s2', wall_time=1, step=301, value=20)
-
- acc.Reload()
- ## Check that we have discarded 200 and 300
- self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 101, 201, 301])
-
- ## Check that s1 discards do not affect s2
- ## i.e. check that only events from the out of order tag are discarded
- self.assertEqual([x.step for x in acc.Scalars('s2')], [101, 201, 301])
-
- def testOnlySummaryEventsTriggerDiscards(self):
- """Test that file version event does not trigger data purge."""
- gen = _EventGenerator(self)
- acc = ea.EventAccumulator(gen)
- gen.AddScalar('s1', wall_time=1, step=100, value=20)
- ev1 = event_pb2.Event(wall_time=2, step=0, file_version='brain.Event:1')
- graph_bytes = graph_pb2.GraphDef().SerializeToString()
- ev2 = event_pb2.Event(wall_time=3, step=0, graph_def=graph_bytes)
- gen.AddEvent(ev1)
- gen.AddEvent(ev2)
- acc.Reload()
- self.assertEqual([x.step for x in acc.Scalars('s1')], [100])
-
- def testSessionLogStartMessageDiscardsExpiredEvents(self):
- """Test that SessionLog.START message discards expired events.
-
- This discard logic is preferred over the out-of-order step discard logic,
- but this logic can only be used for event protos which have the SessionLog
- enum, which was introduced to event.proto for file_version >= brain.Event:2.
- """
- gen = _EventGenerator(self)
- acc = ea.EventAccumulator(gen)
- gen.AddEvent(
- event_pb2.Event(
- wall_time=0, step=1, file_version='brain.Event:2'))
-
- gen.AddScalar('s1', wall_time=1, step=100, value=20)
- gen.AddScalar('s1', wall_time=1, step=200, value=20)
- gen.AddScalar('s1', wall_time=1, step=300, value=20)
- gen.AddScalar('s1', wall_time=1, step=400, value=20)
-
- gen.AddScalar('s2', wall_time=1, step=202, value=20)
- gen.AddScalar('s2', wall_time=1, step=203, value=20)
-
- slog = event_pb2.SessionLog(status=event_pb2.SessionLog.START)
- gen.AddEvent(event_pb2.Event(wall_time=2, step=201, session_log=slog))
- acc.Reload()
- self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 200])
- self.assertEqual([x.step for x in acc.Scalars('s2')], [])
-
- def testFirstEventTimestamp(self):
- """Test that FirstEventTimestamp() returns wall_time of the first event."""
- gen = _EventGenerator(self)
- acc = ea.EventAccumulator(gen)
- gen.AddEvent(
- event_pb2.Event(
- wall_time=10, step=20, file_version='brain.Event:2'))
- gen.AddScalar('s1', wall_time=30, step=40, value=20)
- self.assertEqual(acc.FirstEventTimestamp(), 10)
-
- def testReloadPopulatesFirstEventTimestamp(self):
- """Test that Reload() means FirstEventTimestamp() won't load events."""
- gen = _EventGenerator(self)
- acc = ea.EventAccumulator(gen)
- gen.AddEvent(
- event_pb2.Event(
- wall_time=1, step=2, file_version='brain.Event:2'))
-
- acc.Reload()
-
- def _Die(*args, **kwargs): # pylint: disable=unused-argument
- raise RuntimeError('Load() should not be called')
-
- self.stubs.Set(gen, 'Load', _Die)
- self.assertEqual(acc.FirstEventTimestamp(), 1)
-
- def testFirstEventTimestampLoadsEvent(self):
- """Test that FirstEventTimestamp() doesn't discard the loaded event."""
- gen = _EventGenerator(self)
- acc = ea.EventAccumulator(gen)
- gen.AddEvent(
- event_pb2.Event(
- wall_time=1, step=2, file_version='brain.Event:2'))
-
- self.assertEqual(acc.FirstEventTimestamp(), 1)
- acc.Reload()
- self.assertEqual(acc.file_version, 2.0)
-
- def testTFSummaryScalar(self):
- """Verify processing of tf.summary.scalar."""
- event_sink = _EventGenerator(self, zero_out_timestamps=True)
- writer = SummaryToEventTransformer(event_sink)
- with self.test_session() as sess:
- ipt = array_ops.placeholder(dtypes.float32)
- summary_lib.scalar('scalar1', ipt)
- summary_lib.scalar('scalar2', ipt * ipt)
- merged = summary_lib.merge_all()
- writer.add_graph(sess.graph)
- for i in xrange(10):
- summ = sess.run(merged, feed_dict={ipt: i})
- writer.add_summary(summ, global_step=i)
-
- accumulator = ea.EventAccumulator(event_sink)
- accumulator.Reload()
-
- seq1 = [ea.ScalarEvent(wall_time=0, step=i, value=i) for i in xrange(10)]
- seq2 = [
- ea.ScalarEvent(
- wall_time=0, step=i, value=i * i) for i in xrange(10)
- ]
-
- self.assertTagsEqual(accumulator.Tags(), {
- ea.SCALARS: ['scalar1', 'scalar2'],
- ea.GRAPH: True,
- ea.META_GRAPH: False,
- })
-
- self.assertEqual(accumulator.Scalars('scalar1'), seq1)
- self.assertEqual(accumulator.Scalars('scalar2'), seq2)
- first_value = accumulator.Scalars('scalar1')[0].value
- self.assertTrue(isinstance(first_value, float))
-
- def testTFSummaryImage(self):
- """Verify processing of tf.summary.image."""
- event_sink = _EventGenerator(self, zero_out_timestamps=True)
- writer = SummaryToEventTransformer(event_sink)
- with self.test_session() as sess:
- ipt = array_ops.ones([10, 4, 4, 3], dtypes.uint8)
- # This is an interesting example, because the old tf.image_summary op
- # would throw an error here, because it would be tag reuse.
- # Using the tf node name instead allows argument re-use to the image
- # summary.
- with ops.name_scope('1'):
- summary_lib.image('images', ipt, max_outputs=1)
- with ops.name_scope('2'):
- summary_lib.image('images', ipt, max_outputs=2)
- with ops.name_scope('3'):
- summary_lib.image('images', ipt, max_outputs=3)
- merged = summary_lib.merge_all()
- writer.add_graph(sess.graph)
- for i in xrange(10):
- summ = sess.run(merged)
- writer.add_summary(summ, global_step=i)
-
- accumulator = ea.EventAccumulator(event_sink)
- accumulator.Reload()
-
- tags = [
- u'1/images/image', u'2/images/image/0', u'2/images/image/1',
- u'3/images/image/0', u'3/images/image/1', u'3/images/image/2'
- ]
-
- self.assertTagsEqual(accumulator.Tags(), {
- ea.IMAGES: tags,
- ea.GRAPH: True,
- ea.META_GRAPH: False,
- })
-
- def testTFSummaryTensor(self):
- """Verify processing of tf.summary.tensor."""
- event_sink = _EventGenerator(self, zero_out_timestamps=True)
- writer = SummaryToEventTransformer(event_sink)
- with self.test_session() as sess:
- summary_lib.tensor_summary('scalar', constant_op.constant(1.0))
- summary_lib.tensor_summary('vector', constant_op.constant(
- [1.0, 2.0, 3.0]))
- summary_lib.tensor_summary('string',
- constant_op.constant(six.b('foobar')))
- merged = summary_lib.merge_all()
- summ = sess.run(merged)
- writer.add_summary(summ, 0)
-
- accumulator = ea.EventAccumulator(event_sink)
- accumulator.Reload()
-
- self.assertTagsEqual(accumulator.Tags(), {
- ea.TENSORS: ['scalar', 'vector', 'string'],
- })
-
- scalar_proto = accumulator.Tensors('scalar')[0].tensor_proto
- scalar = tensor_util.MakeNdarray(scalar_proto)
- vector_proto = accumulator.Tensors('vector')[0].tensor_proto
- vector = tensor_util.MakeNdarray(vector_proto)
- string_proto = accumulator.Tensors('string')[0].tensor_proto
- string = tensor_util.MakeNdarray(string_proto)
-
- self.assertTrue(np.array_equal(scalar, 1.0))
- self.assertTrue(np.array_equal(vector, [1.0, 2.0, 3.0]))
- self.assertTrue(np.array_equal(string, six.b('foobar')))
-
-
-class RealisticEventAccumulatorTest(EventAccumulatorTest):
-
- def setUp(self):
- super(RealisticEventAccumulatorTest, self).setUp()
-
- def testScalarsRealistically(self):
- """Test accumulator by writing values and then reading them."""
-
- def FakeScalarSummary(tag, value):
- value = summary_pb2.Summary.Value(tag=tag, simple_value=value)
- summary = summary_pb2.Summary(value=[value])
- return summary
-
- directory = os.path.join(self.get_temp_dir(), 'values_dir')
- if gfile.IsDirectory(directory):
- gfile.DeleteRecursively(directory)
- gfile.MkDir(directory)
-
- writer = writer_lib.FileWriter(directory, max_queue=100)
-
- with ops.Graph().as_default() as graph:
- _ = constant_op.constant([2.0, 1.0])
- # Add a graph to the summary writer.
- writer.add_graph(graph)
- meta_graph_def = saver.export_meta_graph(
- graph_def=graph.as_graph_def(add_shapes=True))
- writer.add_meta_graph(meta_graph_def)
-
- run_metadata = config_pb2.RunMetadata()
- device_stats = run_metadata.step_stats.dev_stats.add()
- device_stats.device = 'test device'
- writer.add_run_metadata(run_metadata, 'test run')
-
- # Write a bunch of events using the writer.
- for i in xrange(30):
- summ_id = FakeScalarSummary('id', i)
- summ_sq = FakeScalarSummary('sq', i * i)
- writer.add_summary(summ_id, i * 5)
- writer.add_summary(summ_sq, i * 5)
- writer.flush()
-
- # Verify that we can load those events properly
- acc = ea.EventAccumulator(directory)
- acc.Reload()
- self.assertTagsEqual(acc.Tags(), {
- ea.SCALARS: ['id', 'sq'],
- ea.GRAPH: True,
- ea.META_GRAPH: True,
- ea.RUN_METADATA: ['test run'],
- })
- id_events = acc.Scalars('id')
- sq_events = acc.Scalars('sq')
- self.assertEqual(30, len(id_events))
- self.assertEqual(30, len(sq_events))
- for i in xrange(30):
- self.assertEqual(i * 5, id_events[i].step)
- self.assertEqual(i * 5, sq_events[i].step)
- self.assertEqual(i, id_events[i].value)
- self.assertEqual(i * i, sq_events[i].value)
-
- # Write a few more events to test incremental reloading
- for i in xrange(30, 40):
- summ_id = FakeScalarSummary('id', i)
- summ_sq = FakeScalarSummary('sq', i * i)
- writer.add_summary(summ_id, i * 5)
- writer.add_summary(summ_sq, i * 5)
- writer.flush()
-
- # Verify we can now see all of the data
- acc.Reload()
- id_events = acc.Scalars('id')
- sq_events = acc.Scalars('sq')
- self.assertEqual(40, len(id_events))
- self.assertEqual(40, len(sq_events))
- for i in xrange(40):
- self.assertEqual(i * 5, id_events[i].step)
- self.assertEqual(i * 5, sq_events[i].step)
- self.assertEqual(i, id_events[i].value)
- self.assertEqual(i * i, sq_events[i].value)
- self.assertProtoEquals(graph.as_graph_def(add_shapes=True), acc.Graph())
- self.assertProtoEquals(meta_graph_def, acc.MetaGraph())
-
- def testGraphFromMetaGraphBecomesAvailable(self):
- """Test accumulator by writing values and then reading them."""
-
- directory = os.path.join(self.get_temp_dir(), 'metagraph_test_values_dir')
- if gfile.IsDirectory(directory):
- gfile.DeleteRecursively(directory)
- gfile.MkDir(directory)
-
- writer = writer_lib.FileWriter(directory, max_queue=100)
-
- with ops.Graph().as_default() as graph:
- _ = constant_op.constant([2.0, 1.0])
- # Add a graph to the summary writer.
- meta_graph_def = saver.export_meta_graph(
- graph_def=graph.as_graph_def(add_shapes=True))
- writer.add_meta_graph(meta_graph_def)
-
- writer.flush()
-
- # Verify that we can load those events properly
- acc = ea.EventAccumulator(directory)
- acc.Reload()
- self.assertTagsEqual(acc.Tags(), {
- ea.GRAPH: True,
- ea.META_GRAPH: True,
- })
- self.assertProtoEquals(graph.as_graph_def(add_shapes=True), acc.Graph())
- self.assertProtoEquals(meta_graph_def, acc.MetaGraph())
-
-
-if __name__ == '__main__':
- test.main()
diff --git a/tensorflow/python/summary/event_file_inspector.py b/tensorflow/python/summary/event_file_inspector.py
deleted file mode 100644
index 3f56bb58e1..0000000000
--- a/tensorflow/python/summary/event_file_inspector.py
+++ /dev/null
@@ -1,427 +0,0 @@
-# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ==============================================================================
-
-"""Logic for TensorBoard inspector to help humans investigate event files.
-
-Example usages:
-tensorboard --inspect --event_file=myevents.out
-tensorboard --inspect --event_file=myevents.out --tag=loss
-tensorboard --inspect --logdir=mylogdir
-tensorboard --inspect --logdir=mylogdir --tag=loss
-
-
-This script runs over a logdir and creates an InspectionUnit for every
-subdirectory with event files. If running over an event file, it creates only
-one InspectionUnit. One block of output is printed to console for each
-InspectionUnit.
-
-The primary content of an InspectionUnit is the dict field_to_obs that maps
-fields (e.g. "scalar", "histogram", "session_log:start", etc.) to a list of
-Observations for the field. Observations correspond one-to-one with Events in an
-event file but contain less information because they only store what is
-necessary to generate the final console output.
-
-The final output is rendered to console by applying some aggregating function
-to the lists of Observations. Different functions are applied depending on the
-type of field. For instance, for "scalar" fields, the inspector shows aggregate
-statistics. For other fields like "session_log:start", all observed steps are
-printed in order to aid debugging.
-
-
-[1] Query a logdir or an event file for its logged tags and summary statistics
-using --logdir or --event_file.
-
-[[event_file]] contains these tags:
-histograms
- binary/Sign/Activations
- binary/nn_tanh/act/Activations
- binary/nn_tanh/biases
- binary/nn_tanh/biases:gradient
- binary/nn_tanh/weights
- binary/nn_tanh/weights:gradient
-images
- input_images/image/0
- input_images/image/1
- input_images/image/2
-scalars
- Learning Rate
- Total Cost
- Total Cost (raw)
-
-Debug output aggregated over all tags:
-graph
- first_step 0
- last_step 0
- max_step 0
- min_step 0
- num_steps 1
- outoforder_steps []
-histograms
- first_step 491
- last_step 659823
- max_step 659823
- min_step 491
- num_steps 993
- outoforder_steps []
-images -
-scalars
- first_step 0
- last_step 659823
- max_step 659823
- min_step 0
- num_steps 1985
- outoforder_steps []
-sessionlog:checkpoint
- first_step 7129
- last_step 657167
- max_step 657167
- min_step 7129
- num_steps 99
- outoforder_steps []
-sessionlog:start
- outoforder_steps []
- steps [0L]
-sessionlog:stop -
-
-
-[2] Drill down into a particular tag using --tag.
-
-Debug output for binary/Sign/Activations:
-histograms
- first_step 491
- last_step 659823
- max_step 659823
- min_step 491
- num_steps 993
- outoforder_steps []
-"""
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-import collections
-import itertools
-import os
-
-from tensorflow.core.util.event_pb2 import SessionLog
-from tensorflow.python.platform import app
-from tensorflow.python.platform import flags
-from tensorflow.python.platform import gfile
-from tensorflow.python.summary import event_accumulator
-from tensorflow.python.summary import event_multiplexer
-from tensorflow.python.summary.impl import event_file_loader
-
-FLAGS = flags.FLAGS
-
-
-# Map of field names within summary.proto to the user-facing names that this
-# script outputs.
-SUMMARY_TYPE_TO_FIELD = {'simple_value': 'scalars',
- 'histo': 'histograms',
- 'image': 'images',
- 'audio': 'audio'}
-for summary_type in event_accumulator.SUMMARY_TYPES:
- if summary_type not in SUMMARY_TYPE_TO_FIELD:
- SUMMARY_TYPE_TO_FIELD[summary_type] = summary_type
-
-# Types of summaries that we may want to query for by tag.
-TAG_FIELDS = list(SUMMARY_TYPE_TO_FIELD.values())
-
-# Summaries that we want to see every instance of.
-LONG_FIELDS = ['sessionlog:start', 'sessionlog:stop']
-
-# Summaries that we only want an abridged digest of, since they would
-# take too much screen real estate otherwise.
-SHORT_FIELDS = ['graph', 'sessionlog:checkpoint'] + TAG_FIELDS
-
-# All summary types that we can inspect.
-TRACKED_FIELDS = SHORT_FIELDS + LONG_FIELDS
-
-# An `Observation` contains the data within each Event file that the inspector
-# cares about. The inspector accumulates Observations as it processes events.
-Observation = collections.namedtuple('Observation', ['step', 'wall_time',
- 'tag'])
-
-# An InspectionUnit is created for each organizational structure in the event
-# files visible in the final terminal output. For instance, one InspectionUnit
-# is created for each subdirectory in logdir. When asked to inspect a single
-# event file, there may only be one InspectionUnit.
-
-# The InspectionUnit contains the `name` of the organizational unit that will be
-# printed to console, a `generator` that yields `Event` protos, and a mapping
-# from string fields to `Observations` that the inspector creates.
-InspectionUnit = collections.namedtuple('InspectionUnit', ['name', 'generator',
- 'field_to_obs'])
-
-PRINT_SEPARATOR = '=' * 70 + '\n'
-
-
-def get_field_to_observations_map(generator, query_for_tag=''):
- """Return a field to `Observations` dict for the event generator.
-
- Args:
- generator: A generator over event protos.
- query_for_tag: A string that if specified, only create observations for
- events with this tag name.
-
- Returns:
- A dict mapping keys in `TRACKED_FIELDS` to an `Observation` list.
- """
-
- def increment(stat, event, tag=''):
- assert stat in TRACKED_FIELDS
- field_to_obs[stat].append(Observation(step=event.step,
- wall_time=event.wall_time,
- tag=tag)._asdict())
-
- field_to_obs = dict([(t, []) for t in TRACKED_FIELDS])
-
- for event in generator:
- ## Process the event
- if event.HasField('graph_def') and (not query_for_tag):
- increment('graph', event)
- if event.HasField('session_log') and (not query_for_tag):
- status = event.session_log.status
- if status == SessionLog.START:
- increment('sessionlog:start', event)
- elif status == SessionLog.STOP:
- increment('sessionlog:stop', event)
- elif status == SessionLog.CHECKPOINT:
- increment('sessionlog:checkpoint', event)
- elif event.HasField('summary'):
- for value in event.summary.value:
- if query_for_tag and value.tag != query_for_tag:
- continue
-
- for proto_name, display_name in SUMMARY_TYPE_TO_FIELD.items():
- if value.HasField(proto_name):
- increment(display_name, event, value.tag)
- return field_to_obs
-
-
-def get_unique_tags(field_to_obs):
- """Returns a dictionary of tags that a user could query over.
-
- Args:
- field_to_obs: Dict that maps string field to `Observation` list.
-
- Returns:
- A dict that maps keys in `TAG_FIELDS` to a list of string tags present in
- the event files. If the dict does not have any observations of the type,
- maps to an empty list so that we can render this to console.
- """
- return {field: sorted(set([x.get('tag', '') for x in observations]))
- for field, observations in field_to_obs.items()
- if field in TAG_FIELDS}
-
-
-def print_dict(d, show_missing=True):
- """Prints a shallow dict to console.
-
- Args:
- d: Dict to print.
- show_missing: Whether to show keys with empty values.
- """
- for k, v in sorted(d.items()):
- if (not v) and show_missing:
- # No instances of the key, so print missing symbol.
- print('{} -'.format(k))
- elif isinstance(v, list):
- # Value is a list, so print each item of the list.
- print(k)
- for item in v:
- print(' {}'.format(item))
- elif isinstance(v, dict):
- # Value is a dict, so print each (key, value) pair of the dict.
- print(k)
- for kk, vv in sorted(v.items()):
- print(' {:<20} {}'.format(kk, vv))
-
-
-def get_dict_to_print(field_to_obs):
- """Transform the field-to-obs mapping into a printable dictionary.
-
- Args:
- field_to_obs: Dict that maps string field to `Observation` list.
-
- Returns:
- A dict with the keys and values to print to console.
- """
-
- def compressed_steps(steps):
- return {'num_steps': len(set(steps)),
- 'min_step': min(steps),
- 'max_step': max(steps),
- 'last_step': steps[-1],
- 'first_step': steps[0],
- 'outoforder_steps': get_out_of_order(steps)}
-
- def full_steps(steps):
- return {'steps': steps, 'outoforder_steps': get_out_of_order(steps)}
-
- output = {}
- for field, observations in field_to_obs.items():
- if not observations:
- output[field] = None
- continue
-
- steps = [x['step'] for x in observations]
- if field in SHORT_FIELDS:
- output[field] = compressed_steps(steps)
- if field in LONG_FIELDS:
- output[field] = full_steps(steps)
-
- return output
-
-
-def get_out_of_order(list_of_numbers):
- """Returns elements that break the monotonically non-decreasing trend.
-
- This is used to find instances of global step values that are "out-of-order",
- which may trigger TensorBoard event discarding logic.
-
- Args:
- list_of_numbers: A list of numbers.
-
- Returns:
- A list of tuples in which each tuple are two elements are adjacent, but the
- second element is lower than the first.
- """
- # TODO(cassandrax): Consider changing this to only check for out-of-order
- # steps within a particular tag.
- result = []
- for i in range(len(list_of_numbers)):
- if i == 0:
- continue
- if list_of_numbers[i] < list_of_numbers[i - 1]:
- result.append((list_of_numbers[i - 1], list_of_numbers[i]))
- return result
-
-
-def generators_from_logdir(logdir):
- """Returns a list of event generators for subdirectories with event files.
-
- The number of generators returned should equal the number of directories
- within logdir that contain event files. If only logdir contains event files,
- returns a list of length one.
-
- Args:
- logdir: A log directory that contains event files.
-
- Returns:
- List of event generators for each subdirectory with event files.
- """
- subdirs = event_multiplexer.GetLogdirSubdirectories(logdir)
- generators = [itertools.chain(*[
- generator_from_event_file(os.path.join(subdir, f))
- for f in gfile.ListDirectory(subdir)
- if event_accumulator.IsTensorFlowEventsFile(os.path.join(subdir, f))
- ]) for subdir in subdirs]
- return generators
-
-
-def generator_from_event_file(event_file):
- """Returns a generator that yields events from an event file."""
- return event_file_loader.EventFileLoader(event_file).Load()
-
-
-def get_inspection_units(logdir='', event_file='', tag=''):
- """Returns a list of InspectionUnit objects given either logdir or event_file.
-
- If logdir is given, the number of InspectionUnits should equal the
- number of directories or subdirectories that contain event files.
-
- If event_file is given, the number of InspectionUnits should be 1.
-
- Args:
- logdir: A log directory that contains event files.
- event_file: Or, a particular event file path.
- tag: An optional tag name to query for.
-
- Returns:
- A list of InspectionUnit objects.
- """
- if logdir:
- subdirs = event_multiplexer.GetLogdirSubdirectories(logdir)
- inspection_units = []
- for subdir in subdirs:
- generator = itertools.chain(*[
- generator_from_event_file(os.path.join(subdir, f))
- for f in gfile.ListDirectory(subdir)
- if event_accumulator.IsTensorFlowEventsFile(os.path.join(subdir, f))
- ])
- inspection_units.append(InspectionUnit(
- name=subdir,
- generator=generator,
- field_to_obs=get_field_to_observations_map(generator, tag)))
- if inspection_units:
- print('Found event files in:\n{}\n'.format('\n'.join(
- [u.name for u in inspection_units])))
- elif event_accumulator.IsTensorFlowEventsFile(logdir):
- print(
- 'It seems that {} may be an event file instead of a logdir. If this '
- 'is the case, use --event_file instead of --logdir to pass '
- 'it in.'.format(logdir))
- else:
- print('No event files found within logdir {}'.format(logdir))
- return inspection_units
- elif event_file:
- generator = generator_from_event_file(event_file)
- return [InspectionUnit(
- name=event_file,
- generator=generator,
- field_to_obs=get_field_to_observations_map(generator, tag))]
-
-
-def inspect(logdir='', event_file='', tag=''):
- """Main function for inspector that prints out a digest of event files.
-
- Args:
- logdir: A log directory that contains event files.
- event_file: Or, a particular event file path.
- tag: An optional tag name to query for.
-
- Raises:
- ValueError: If neither logdir and event_file are given, or both are given.
- """
- if logdir and event_file:
- raise ValueError(
- 'Must specify either --logdir or --event_file, but not both.')
- if not (logdir or event_file):
- raise ValueError('Must specify either --logdir or --event_file.')
-
- print(PRINT_SEPARATOR +
- 'Processing event files... (this can take a few minutes)\n' +
- PRINT_SEPARATOR)
- inspection_units = get_inspection_units(logdir, event_file, tag)
-
- for unit in inspection_units:
- if tag:
- print('Event statistics for tag {} in {}:'.format(tag, unit.name))
- else:
- # If the user is not inspecting a particular tag, also print the list of
- # all available tags that they can query.
- print('These tags are in {}:'.format(unit.name))
- print_dict(get_unique_tags(unit.field_to_obs))
- print(PRINT_SEPARATOR)
- print('Event statistics for {}:'.format(unit.name))
-
- print_dict(get_dict_to_print(unit.field_to_obs), show_missing=(not tag))
- print(PRINT_SEPARATOR)
-
-
-if __name__ == '__main__':
- app.run()
diff --git a/tensorflow/python/summary/event_file_inspector_test.py b/tensorflow/python/summary/event_file_inspector_test.py
deleted file mode 100644
index 940586cf97..0000000000
--- a/tensorflow/python/summary/event_file_inspector_test.py
+++ /dev/null
@@ -1,172 +0,0 @@
-# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ==============================================================================
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-import os
-import shutil
-
-from tensorflow.core.framework.summary_pb2 import HistogramProto
-from tensorflow.core.framework.summary_pb2 import Summary
-from tensorflow.core.util.event_pb2 import SessionLog
-from tensorflow.python.framework import test_util
-from tensorflow.python.platform import googletest
-from tensorflow.python.summary import event_file_inspector as efi
-from tensorflow.python.training.summary_io import SummaryWriter
-
-
-class EventFileInspectorTest(test_util.TensorFlowTestCase):
-
- def setUp(self):
- self.logdir = os.path.join(self.get_temp_dir(), 'tfevents')
- self._MakeDirectoryIfNotExists(self.logdir)
-
- def tearDown(self):
- shutil.rmtree(self.logdir)
-
- def _MakeDirectoryIfNotExists(self, path):
- if not os.path.exists(path):
- os.mkdir(path)
-
- def _WriteScalarSummaries(self, data, subdirs=('',)):
- # Writes data to a tempfile in subdirs, and returns generator for the data.
- # If subdirs is given, writes data identically to all subdirectories.
- for subdir_ in subdirs:
- subdir = os.path.join(self.logdir, subdir_)
- self._MakeDirectoryIfNotExists(subdir)
-
- sw = SummaryWriter(subdir)
- for datum in data:
- summary = Summary()
- if 'simple_value' in datum:
- summary.value.add(tag=datum['tag'],
- simple_value=datum['simple_value'])
- sw.add_summary(summary, global_step=datum['step'])
- elif 'histo' in datum:
- summary.value.add(tag=datum['tag'], histo=HistogramProto())
- sw.add_summary(summary, global_step=datum['step'])
- elif 'session_log' in datum:
- sw.add_session_log(datum['session_log'], global_step=datum['step'])
- sw.close()
-
- def testEmptyLogdir(self):
- # Nothing was written to logdir
- units = efi.get_inspection_units(self.logdir)
- self.assertEqual([], units)
-
- def testGetAvailableTags(self):
- data = [{'tag': 'c', 'histo': 2, 'step': 10},
- {'tag': 'c', 'histo': 2, 'step': 11},
- {'tag': 'c', 'histo': 2, 'step': 9},
- {'tag': 'b', 'simple_value': 2, 'step': 20},
- {'tag': 'b', 'simple_value': 2, 'step': 15},
- {'tag': 'a', 'simple_value': 2, 'step': 3}]
- self._WriteScalarSummaries(data)
- units = efi.get_inspection_units(self.logdir)
- tags = efi.get_unique_tags(units[0].field_to_obs)
- self.assertEqual(['a', 'b'], tags['scalars'])
- self.assertEqual(['c'], tags['histograms'])
-
- def testInspectAll(self):
- data = [{'tag': 'c', 'histo': 2, 'step': 10},
- {'tag': 'c', 'histo': 2, 'step': 11},
- {'tag': 'c', 'histo': 2, 'step': 9},
- {'tag': 'b', 'simple_value': 2, 'step': 20},
- {'tag': 'b', 'simple_value': 2, 'step': 15},
- {'tag': 'a', 'simple_value': 2, 'step': 3}]
- self._WriteScalarSummaries(data)
- units = efi.get_inspection_units(self.logdir)
- printable = efi.get_dict_to_print(units[0].field_to_obs)
- self.assertEqual(printable['histograms']['max_step'], 11)
- self.assertEqual(printable['histograms']['min_step'], 9)
- self.assertEqual(printable['histograms']['num_steps'], 3)
- self.assertEqual(printable['histograms']['last_step'], 9)
- self.assertEqual(printable['histograms']['first_step'], 10)
- self.assertEqual(printable['histograms']['outoforder_steps'], [(11, 9)])
-
- self.assertEqual(printable['scalars']['max_step'], 20)
- self.assertEqual(printable['scalars']['min_step'], 3)
- self.assertEqual(printable['scalars']['num_steps'], 3)
- self.assertEqual(printable['scalars']['last_step'], 3)
- self.assertEqual(printable['scalars']['first_step'], 20)
- self.assertEqual(printable['scalars']['outoforder_steps'], [(20, 15),
- (15, 3)])
-
- def testInspectTag(self):
- data = [{'tag': 'c', 'histo': 2, 'step': 10},
- {'tag': 'c', 'histo': 2, 'step': 11},
- {'tag': 'c', 'histo': 2, 'step': 9},
- {'tag': 'b', 'histo': 2, 'step': 20},
- {'tag': 'b', 'simple_value': 2, 'step': 15},
- {'tag': 'a', 'simple_value': 2, 'step': 3}]
- self._WriteScalarSummaries(data)
- units = efi.get_inspection_units(self.logdir, tag='c')
- printable = efi.get_dict_to_print(units[0].field_to_obs)
- self.assertEqual(printable['histograms']['max_step'], 11)
- self.assertEqual(printable['histograms']['min_step'], 9)
- self.assertEqual(printable['histograms']['num_steps'], 3)
- self.assertEqual(printable['histograms']['last_step'], 9)
- self.assertEqual(printable['histograms']['first_step'], 10)
- self.assertEqual(printable['histograms']['outoforder_steps'], [(11, 9)])
- self.assertEqual(printable['scalars'], None)
-
- def testSessionLogSummaries(self):
- data = [
- {'session_log': SessionLog(status=SessionLog.START), 'step': 0},
- {'session_log': SessionLog(status=SessionLog.CHECKPOINT), 'step': 1},
- {'session_log': SessionLog(status=SessionLog.CHECKPOINT), 'step': 2},
- {'session_log': SessionLog(status=SessionLog.CHECKPOINT), 'step': 3},
- {'session_log': SessionLog(status=SessionLog.STOP), 'step': 4},
- {'session_log': SessionLog(status=SessionLog.START), 'step': 5},
- {'session_log': SessionLog(status=SessionLog.STOP), 'step': 6},
- ]
-
- self._WriteScalarSummaries(data)
- units = efi.get_inspection_units(self.logdir)
- self.assertEqual(1, len(units))
- printable = efi.get_dict_to_print(units[0].field_to_obs)
- self.assertEqual(printable['sessionlog:start']['steps'], [0, 5])
- self.assertEqual(printable['sessionlog:stop']['steps'], [4, 6])
- self.assertEqual(printable['sessionlog:checkpoint']['num_steps'], 3)
-
- def testInspectAllWithNestedLogdirs(self):
- data = [{'tag': 'c', 'simple_value': 2, 'step': 10},
- {'tag': 'c', 'simple_value': 2, 'step': 11},
- {'tag': 'c', 'simple_value': 2, 'step': 9},
- {'tag': 'b', 'simple_value': 2, 'step': 20},
- {'tag': 'b', 'simple_value': 2, 'step': 15},
- {'tag': 'a', 'simple_value': 2, 'step': 3}]
-
- subdirs = ['eval', 'train']
- self._WriteScalarSummaries(data, subdirs=subdirs)
- units = efi.get_inspection_units(self.logdir)
- self.assertEqual(2, len(units))
- directory_names = [os.path.join(self.logdir, name) for name in subdirs]
- self.assertEqual(directory_names, sorted([unit.name for unit in units]))
-
- for unit in units:
- printable = efi.get_dict_to_print(unit.field_to_obs)['scalars']
- self.assertEqual(printable['max_step'], 20)
- self.assertEqual(printable['min_step'], 3)
- self.assertEqual(printable['num_steps'], 6)
- self.assertEqual(printable['last_step'], 3)
- self.assertEqual(printable['first_step'], 10)
- self.assertEqual(printable['outoforder_steps'], [(11, 9), (20, 15),
- (15, 3)])
-
-if __name__ == '__main__':
- googletest.main()
diff --git a/tensorflow/python/summary/event_multiplexer.py b/tensorflow/python/summary/event_multiplexer.py
deleted file mode 100644
index 18176e10fe..0000000000
--- a/tensorflow/python/summary/event_multiplexer.py
+++ /dev/null
@@ -1,428 +0,0 @@
-# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ==============================================================================
-"""Provides an interface for working with multiple event files."""
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-import os
-import threading
-
-import six
-
-from tensorflow.python.platform import gfile
-from tensorflow.python.platform import tf_logging as logging
-from tensorflow.python.summary import event_accumulator
-from tensorflow.python.summary.impl import directory_watcher
-from tensorflow.python.summary.impl import io_wrapper
-
-
-class EventMultiplexer(object):
- """An `EventMultiplexer` manages access to multiple `EventAccumulator`s.
-
- Each `EventAccumulator` is associated with a `run`, which is a self-contained
- TensorFlow execution. The `EventMultiplexer` provides methods for extracting
- information about events from multiple `run`s.
-
- Example usage for loading specific runs from files:
-
- ```python
- x = EventMultiplexer({'run1': 'path/to/run1', 'run2': 'path/to/run2'})
- x.Reload()
- ```
-
- Example usage for loading a directory where each subdirectory is a run
-
- ```python
- (eg:) /parent/directory/path/
- /parent/directory/path/run1/
- /parent/directory/path/run1/events.out.tfevents.1001
- /parent/directory/path/run1/events.out.tfevents.1002
-
- /parent/directory/path/run2/
- /parent/directory/path/run2/events.out.tfevents.9232
-
- /parent/directory/path/run3/
- /parent/directory/path/run3/events.out.tfevents.9232
- x = EventMultiplexer().AddRunsFromDirectory('/parent/directory/path')
- (which is equivalent to:)
- x = EventMultiplexer({'run1': '/parent/directory/path/run1', 'run2':...}
- ```
-
- If you would like to watch `/parent/directory/path`, wait for it to be created
- (if necessary) and then periodically pick up new runs, use
- `AutoloadingMultiplexer`
- @@Tensors
- """
-
- def __init__(self,
- run_path_map=None,
- size_guidance=event_accumulator.DEFAULT_SIZE_GUIDANCE,
- purge_orphaned_data=True):
- """Constructor for the `EventMultiplexer`.
-
- Args:
- run_path_map: Dict `{run: path}` which specifies the
- name of a run, and the path to find the associated events. If it is
- None, then the EventMultiplexer initializes without any runs.
- size_guidance: A dictionary mapping from `tagType` to the number of items
- to store for each tag of that type. See
- `event_accumulator.EventAccumulator` for details.
- purge_orphaned_data: Whether to discard any events that were "orphaned" by
- a TensorFlow restart.
- """
- logging.info('Event Multiplexer initializing.')
- self._accumulators_mutex = threading.Lock()
- self._accumulators = {}
- self._paths = {}
- self._reload_called = False
- self._size_guidance = size_guidance
- self.purge_orphaned_data = purge_orphaned_data
- if run_path_map is not None:
- logging.info('Event Multplexer doing initialization load for %s',
- run_path_map)
- for (run, path) in six.iteritems(run_path_map):
- self.AddRun(path, run)
- logging.info('Event Multiplexer done initializing')
-
- def AddRun(self, path, name=None):
- """Add a run to the multiplexer.
-
- If the name is not specified, it is the same as the path.
-
- If a run by that name exists, and we are already watching the right path,
- do nothing. If we are watching a different path, replace the event
- accumulator.
-
- If `Reload` has been called, it will `Reload` the newly created
- accumulators.
-
- Args:
- path: Path to the event files (or event directory) for given run.
- name: Name of the run to add. If not provided, is set to path.
-
- Returns:
- The `EventMultiplexer`.
- """
- if name is None or name is '':
- name = path
- accumulator = None
- with self._accumulators_mutex:
- if name not in self._accumulators or self._paths[name] != path:
- if name in self._paths and self._paths[name] != path:
- # TODO(danmane) - Make it impossible to overwrite an old path with
- # a new path (just give the new path a distinct name)
- logging.warning('Conflict for name %s: old path %s, new path %s',
- name, self._paths[name], path)
- logging.info('Constructing EventAccumulator for %s', path)
- accumulator = event_accumulator.EventAccumulator(
- path,
- size_guidance=self._size_guidance,
- purge_orphaned_data=self.purge_orphaned_data)
- self._accumulators[name] = accumulator
- self._paths[name] = path
- if accumulator:
- if self._reload_called:
- accumulator.Reload()
- return self
-
- def AddRunsFromDirectory(self, path, name=None):
- """Load runs from a directory; recursively walks subdirectories.
-
- If path doesn't exist, no-op. This ensures that it is safe to call
- `AddRunsFromDirectory` multiple times, even before the directory is made.
-
- If path is a directory, load event files in the directory (if any exist) and
- recursively call AddRunsFromDirectory on any subdirectories. This mean you
- can call AddRunsFromDirectory at the root of a tree of event logs and
- TensorBoard will load them all.
-
- If the `EventMultiplexer` is already loaded this will cause
- the newly created accumulators to `Reload()`.
- Args:
- path: A string path to a directory to load runs from.
- name: Optionally, what name to apply to the runs. If name is provided
- and the directory contains run subdirectories, the name of each subrun
- is the concatenation of the parent name and the subdirectory name. If
- name is provided and the directory contains event files, then a run
- is added called "name" and with the events from the path.
-
- Raises:
- ValueError: If the path exists and isn't a directory.
-
- Returns:
- The `EventMultiplexer`.
- """
- logging.info('Starting AddRunsFromDirectory: %s', path)
- for subdir in GetLogdirSubdirectories(path):
- logging.info('Adding events from directory %s', subdir)
- rpath = os.path.relpath(subdir, path)
- subname = os.path.join(name, rpath) if name else rpath
- self.AddRun(subdir, name=subname)
- logging.info('Done with AddRunsFromDirectory: %s', path)
- return self
-
- def Reload(self):
- """Call `Reload` on every `EventAccumulator`."""
- logging.info('Beginning EventMultiplexer.Reload()')
- self._reload_called = True
- # Build a list so we're safe even if the list of accumulators is modified
- # even while we're reloading.
- with self._accumulators_mutex:
- items = list(self._accumulators.items())
-
- names_to_delete = set()
- for name, accumulator in items:
- try:
- accumulator.Reload()
- except (OSError, IOError) as e:
- logging.error("Unable to reload accumulator '%s': %s", name, e)
- except directory_watcher.DirectoryDeletedError:
- names_to_delete.add(name)
-
- with self._accumulators_mutex:
- for name in names_to_delete:
- logging.warning("Deleting accumulator '%s'", name)
- del self._accumulators[name]
- logging.info('Finished with EventMultiplexer.Reload()')
- return self
-
- def FirstEventTimestamp(self, run):
- """Return the timestamp of the first event of the given run.
-
- This may perform I/O if no events have been loaded yet for the run.
-
- Args:
- run: A string name of the run for which the timestamp is retrieved.
-
- Returns:
- The wall_time of the first event of the run, which will typically be
- seconds since the epoch.
-
- Raises:
- KeyError: If the run is not found.
- ValueError: If the run has no events loaded and there are no events on
- disk to load.
- """
- accumulator = self._GetAccumulator(run)
- return accumulator.FirstEventTimestamp()
-
- def Scalars(self, run, tag):
- """Retrieve the scalar events associated with a run and tag.
-
- Args:
- run: A string name of the run for which values are retrieved.
- tag: A string name of the tag for which values are retrieved.
-
- Raises:
- KeyError: If the run is not found, or the tag is not available for
- the given run.
-
- Returns:
- An array of `event_accumulator.ScalarEvents`.
- """
- accumulator = self._GetAccumulator(run)
- return accumulator.Scalars(tag)
-
- def HealthPills(self, run, node_name):
- """Retrieve the health pill events associated with a run and node name.
-
- Args:
- run: A string name of the run for which health pills are retrieved.
- node_name: A string name of the node for which health pills are retrieved.
-
- Raises:
- KeyError: If the run is not found, or the node name is not available for
- the given run.
-
- Returns:
- An array of `event_accumulator.HealthPillEvents`.
- """
- accumulator = self._GetAccumulator(run)
- return accumulator.HealthPills(node_name)
-
- def Graph(self, run):
- """Retrieve the graph associated with the provided run.
-
- Args:
- run: A string name of a run to load the graph for.
-
- Raises:
- KeyError: If the run is not found.
- ValueError: If the run does not have an associated graph.
-
- Returns:
- The `GraphDef` protobuf data structure.
- """
- accumulator = self._GetAccumulator(run)
- return accumulator.Graph()
-
- def MetaGraph(self, run):
- """Retrieve the metagraph associated with the provided run.
-
- Args:
- run: A string name of a run to load the graph for.
-
- Raises:
- KeyError: If the run is not found.
- ValueError: If the run does not have an associated graph.
-
- Returns:
- The `MetaGraphDef` protobuf data structure.
- """
- accumulator = self._GetAccumulator(run)
- return accumulator.MetaGraph()
-
- def RunMetadata(self, run, tag):
- """Get the session.run() metadata associated with a TensorFlow run and tag.
-
- Args:
- run: A string name of a TensorFlow run.
- tag: A string name of the tag associated with a particular session.run().
-
- Raises:
- KeyError: If the run is not found, or the tag is not available for the
- given run.
-
- Returns:
- The metadata in the form of `RunMetadata` protobuf data structure.
- """
- accumulator = self._GetAccumulator(run)
- return accumulator.RunMetadata(tag)
-
- def Histograms(self, run, tag):
- """Retrieve the histogram events associated with a run and tag.
-
- Args:
- run: A string name of the run for which values are retrieved.
- tag: A string name of the tag for which values are retrieved.
-
- Raises:
- KeyError: If the run is not found, or the tag is not available for
- the given run.
-
- Returns:
- An array of `event_accumulator.HistogramEvents`.
- """
- accumulator = self._GetAccumulator(run)
- return accumulator.Histograms(tag)
-
- def CompressedHistograms(self, run, tag):
- """Retrieve the compressed histogram events associated with a run and tag.
-
- Args:
- run: A string name of the run for which values are retrieved.
- tag: A string name of the tag for which values are retrieved.
-
- Raises:
- KeyError: If the run is not found, or the tag is not available for
- the given run.
-
- Returns:
- An array of `event_accumulator.CompressedHistogramEvents`.
- """
- accumulator = self._GetAccumulator(run)
- return accumulator.CompressedHistograms(tag)
-
- def Images(self, run, tag):
- """Retrieve the image events associated with a run and tag.
-
- Args:
- run: A string name of the run for which values are retrieved.
- tag: A string name of the tag for which values are retrieved.
-
- Raises:
- KeyError: If the run is not found, or the tag is not available for
- the given run.
-
- Returns:
- An array of `event_accumulator.ImageEvents`.
- """
- accumulator = self._GetAccumulator(run)
- return accumulator.Images(tag)
-
- def Audio(self, run, tag):
- """Retrieve the audio events associated with a run and tag.
-
- Args:
- run: A string name of the run for which values are retrieved.
- tag: A string name of the tag for which values are retrieved.
-
- Raises:
- KeyError: If the run is not found, or the tag is not available for
- the given run.
-
- Returns:
- An array of `event_accumulator.AudioEvents`.
- """
- accumulator = self._GetAccumulator(run)
- return accumulator.Audio(tag)
-
- def Tensors(self, run, tag):
- """Retrieve the tensor events associated with a run and tag.
-
- Args:
- run: A string name of the run for which values are retrieved.
- tag: A string name of the tag for which values are retrieved.
-
- Raises:
- KeyError: If the run is not found, or the tag is not available for
- the given run.
-
- Returns:
- An array of `event_accumulator.TensorEvent`s.
- """
- accumulator = self._GetAccumulator(run)
- return accumulator.Tensors(tag)
-
- def Runs(self):
- """Return all the run names in the `EventMultiplexer`.
-
- Returns:
- ```
- {runName: { images: [tag1, tag2, tag3],
- scalarValues: [tagA, tagB, tagC],
- histograms: [tagX, tagY, tagZ],
- compressedHistograms: [tagX, tagY, tagZ],
- graph: true, meta_graph: true}}
- ```
- """
- with self._accumulators_mutex:
- # To avoid nested locks, we construct a copy of the run-accumulator map
- items = list(six.iteritems(self._accumulators))
- return {run_name: accumulator.Tags() for run_name, accumulator in items}
-
- def RunPaths(self):
- """Returns a dict mapping run names to event file paths."""
- return self._paths
-
- def _GetAccumulator(self, run):
- with self._accumulators_mutex:
- return self._accumulators[run]
-
-
-def GetLogdirSubdirectories(path):
- """Returns subdirectories with event files on path."""
- if gfile.Exists(path) and not gfile.IsDirectory(path):
- raise ValueError('GetLogdirSubdirectories: path exists and is not a '
- 'directory, %s' % path)
-
- # ListRecursively just yields nothing if the path doesn't exist.
- return (
- subdir
- for (subdir, files) in io_wrapper.ListRecursively(path)
- if list(filter(event_accumulator.IsTensorFlowEventsFile, files))
- )
diff --git a/tensorflow/python/summary/event_multiplexer_test.py b/tensorflow/python/summary/event_multiplexer_test.py
deleted file mode 100644
index 8f78c6c547..0000000000
--- a/tensorflow/python/summary/event_multiplexer_test.py
+++ /dev/null
@@ -1,319 +0,0 @@
-# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ==============================================================================
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-import os
-import os.path
-import shutil
-
-from tensorflow.python.framework import test_util
-from tensorflow.python.platform import gfile
-from tensorflow.python.platform import googletest
-from tensorflow.python.summary import event_accumulator
-from tensorflow.python.summary import event_multiplexer
-
-
-def _AddEvents(path):
- if not gfile.IsDirectory(path):
- gfile.MakeDirs(path)
- fpath = os.path.join(path, 'hypothetical.tfevents.out')
- with gfile.GFile(fpath, 'w') as f:
- f.write('')
- return fpath
-
-
-def _CreateCleanDirectory(path):
- if gfile.IsDirectory(path):
- gfile.DeleteRecursively(path)
- gfile.MkDir(path)
-
-
-class _FakeAccumulator(object):
-
- def __init__(self, path):
- self._path = path
- self.reload_called = False
- self._node_names_to_health_pills = {'Add': ['hp1', 'hp2']}
-
- def Tags(self):
- return {event_accumulator.IMAGES: ['im1', 'im2'],
- event_accumulator.AUDIO: ['snd1', 'snd2'],
- event_accumulator.HISTOGRAMS: ['hst1', 'hst2'],
- event_accumulator.COMPRESSED_HISTOGRAMS: ['cmphst1', 'cmphst2'],
- event_accumulator.SCALARS: ['sv1', 'sv2']}
-
- def FirstEventTimestamp(self):
- return 0
-
- def _TagHelper(self, tag_name, enum):
- if tag_name not in self.Tags()[enum]:
- raise KeyError
- return ['%s/%s' % (self._path, tag_name)]
-
- def Scalars(self, tag_name):
- return self._TagHelper(tag_name, event_accumulator.SCALARS)
-
- def HealthPills(self, node_name):
- if node_name not in self._node_names_to_health_pills:
- raise KeyError
- health_pills = self._node_names_to_health_pills[node_name]
- return [self._path + '/' + health_pill for health_pill in health_pills]
-
- def Histograms(self, tag_name):
- return self._TagHelper(tag_name, event_accumulator.HISTOGRAMS)
-
- def CompressedHistograms(self, tag_name):
- return self._TagHelper(tag_name, event_accumulator.COMPRESSED_HISTOGRAMS)
-
- def Images(self, tag_name):
- return self._TagHelper(tag_name, event_accumulator.IMAGES)
-
- def Audio(self, tag_name):
- return self._TagHelper(tag_name, event_accumulator.AUDIO)
-
- def Tensors(self, tag_name):
- return self._TagHelper(tag_name, event_accumulator.TENSORS)
-
- def Reload(self):
- self.reload_called = True
-
-
-# pylint: disable=unused-argument
-def _GetFakeAccumulator(
- path,
- size_guidance=None,
- compression_bps=None,
- purge_orphaned_data=None):
- return _FakeAccumulator(path)
-# pylint: enable=unused-argument
-
-
-class EventMultiplexerTest(test_util.TensorFlowTestCase):
-
- def setUp(self):
- super(EventMultiplexerTest, self).setUp()
- self.stubs = googletest.StubOutForTesting()
-
- self.stubs.Set(event_accumulator, 'EventAccumulator', _GetFakeAccumulator)
-
- def tearDown(self):
- self.stubs.CleanUp()
-
- def testEmptyLoader(self):
- x = event_multiplexer.EventMultiplexer()
- self.assertEqual(x.Runs(), {})
-
- def testRunNamesRespected(self):
- x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'})
- self.assertItemsEqual(sorted(x.Runs().keys()), ['run1', 'run2'])
- self.assertEqual(x._GetAccumulator('run1')._path, 'path1')
- self.assertEqual(x._GetAccumulator('run2')._path, 'path2')
-
- def testReload(self):
- x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'})
- self.assertFalse(x._GetAccumulator('run1').reload_called)
- self.assertFalse(x._GetAccumulator('run2').reload_called)
- x.Reload()
- self.assertTrue(x._GetAccumulator('run1').reload_called)
- self.assertTrue(x._GetAccumulator('run2').reload_called)
-
- def testScalars(self):
- x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'})
-
- run1_actual = x.Scalars('run1', 'sv1')
- run1_expected = ['path1/sv1']
-
- self.assertEqual(run1_expected, run1_actual)
-
- def testHealthPills(self):
- x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'})
- self.assertEqual(['path1/hp1', 'path1/hp2'], x.HealthPills('run1', 'Add'))
-
- def testExceptions(self):
- x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'})
- with self.assertRaises(KeyError):
- x.Scalars('sv1', 'xxx')
-
- def testInitialization(self):
- x = event_multiplexer.EventMultiplexer()
- self.assertEqual(x.Runs(), {})
- x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'})
- self.assertItemsEqual(x.Runs(), ['run1', 'run2'])
- self.assertEqual(x._GetAccumulator('run1')._path, 'path1')
- self.assertEqual(x._GetAccumulator('run2')._path, 'path2')
-
- def testAddRunsFromDirectory(self):
- x = event_multiplexer.EventMultiplexer()
- tmpdir = self.get_temp_dir()
- join = os.path.join
- fakedir = join(tmpdir, 'fake_accumulator_directory')
- realdir = join(tmpdir, 'real_accumulator_directory')
- self.assertEqual(x.Runs(), {})
- x.AddRunsFromDirectory(fakedir)
- self.assertEqual(x.Runs(), {}, 'loading fakedir had no effect')
-
- _CreateCleanDirectory(realdir)
- x.AddRunsFromDirectory(realdir)
- self.assertEqual(x.Runs(), {}, 'loading empty directory had no effect')
-
- path1 = join(realdir, 'path1')
- gfile.MkDir(path1)
- x.AddRunsFromDirectory(realdir)
- self.assertEqual(x.Runs(), {}, 'creating empty subdirectory had no effect')
-
- _AddEvents(path1)
- x.AddRunsFromDirectory(realdir)
- self.assertItemsEqual(x.Runs(), ['path1'], 'loaded run: path1')
- loader1 = x._GetAccumulator('path1')
- self.assertEqual(loader1._path, path1, 'has the correct path')
-
- path2 = join(realdir, 'path2')
- _AddEvents(path2)
- x.AddRunsFromDirectory(realdir)
- self.assertItemsEqual(x.Runs(), ['path1', 'path2'])
- self.assertEqual(
- x._GetAccumulator('path1'), loader1, 'loader1 not regenerated')
-
- path2_2 = join(path2, 'path2')
- _AddEvents(path2_2)
- x.AddRunsFromDirectory(realdir)
- self.assertItemsEqual(x.Runs(), ['path1', 'path2', 'path2/path2'])
- self.assertEqual(
- x._GetAccumulator('path2/path2')._path, path2_2, 'loader2 path correct')
-
- def testAddRunsFromDirectoryThatContainsEvents(self):
- x = event_multiplexer.EventMultiplexer()
- tmpdir = self.get_temp_dir()
- join = os.path.join
- realdir = join(tmpdir, 'event_containing_directory')
-
- _CreateCleanDirectory(realdir)
-
- self.assertEqual(x.Runs(), {})
-
- _AddEvents(realdir)
- x.AddRunsFromDirectory(realdir)
- self.assertItemsEqual(x.Runs(), ['.'])
-
- subdir = join(realdir, 'subdir')
- _AddEvents(subdir)
- x.AddRunsFromDirectory(realdir)
- self.assertItemsEqual(x.Runs(), ['.', 'subdir'])
-
- def testAddRunsFromDirectoryWithRunNames(self):
- x = event_multiplexer.EventMultiplexer()
- tmpdir = self.get_temp_dir()
- join = os.path.join
- realdir = join(tmpdir, 'event_containing_directory')
-
- _CreateCleanDirectory(realdir)
-
- self.assertEqual(x.Runs(), {})
-
- _AddEvents(realdir)
- x.AddRunsFromDirectory(realdir, 'foo')
- self.assertItemsEqual(x.Runs(), ['foo/.'])
-
- subdir = join(realdir, 'subdir')
- _AddEvents(subdir)
- x.AddRunsFromDirectory(realdir, 'foo')
- self.assertItemsEqual(x.Runs(), ['foo/.', 'foo/subdir'])
-
- def testAddRunsFromDirectoryWalksTree(self):
- x = event_multiplexer.EventMultiplexer()
- tmpdir = self.get_temp_dir()
- join = os.path.join
- realdir = join(tmpdir, 'event_containing_directory')
-
- _CreateCleanDirectory(realdir)
- _AddEvents(realdir)
- sub = join(realdir, 'subdirectory')
- sub1 = join(sub, '1')
- sub2 = join(sub, '2')
- sub1_1 = join(sub1, '1')
- _AddEvents(sub1)
- _AddEvents(sub2)
- _AddEvents(sub1_1)
- x.AddRunsFromDirectory(realdir)
-
- self.assertItemsEqual(x.Runs(), ['.', 'subdirectory/1', 'subdirectory/2',
- 'subdirectory/1/1'])
-
- def testAddRunsFromDirectoryThrowsException(self):
- x = event_multiplexer.EventMultiplexer()
- tmpdir = self.get_temp_dir()
-
- filepath = _AddEvents(tmpdir)
- with self.assertRaises(ValueError):
- x.AddRunsFromDirectory(filepath)
-
- def testAddRun(self):
- x = event_multiplexer.EventMultiplexer()
- x.AddRun('run1_path', 'run1')
- run1 = x._GetAccumulator('run1')
- self.assertEqual(sorted(x.Runs().keys()), ['run1'])
- self.assertEqual(run1._path, 'run1_path')
-
- x.AddRun('run1_path', 'run1')
- self.assertEqual(run1, x._GetAccumulator('run1'), 'loader not recreated')
-
- x.AddRun('run2_path', 'run1')
- new_run1 = x._GetAccumulator('run1')
- self.assertEqual(new_run1._path, 'run2_path')
- self.assertNotEqual(run1, new_run1)
-
- x.AddRun('runName3')
- self.assertItemsEqual(sorted(x.Runs().keys()), ['run1', 'runName3'])
- self.assertEqual(x._GetAccumulator('runName3')._path, 'runName3')
-
- def testAddRunMaintainsLoading(self):
- x = event_multiplexer.EventMultiplexer()
- x.Reload()
- x.AddRun('run1')
- x.AddRun('run2')
- self.assertTrue(x._GetAccumulator('run1').reload_called)
- self.assertTrue(x._GetAccumulator('run2').reload_called)
-
-
-class EventMultiplexerWithRealAccumulatorTest(test_util.TensorFlowTestCase):
-
- def testDeletingDirectoryRemovesRun(self):
- x = event_multiplexer.EventMultiplexer()
- tmpdir = self.get_temp_dir()
- join = os.path.join
- run1_dir = join(tmpdir, 'run1')
- run2_dir = join(tmpdir, 'run2')
- run3_dir = join(tmpdir, 'run3')
-
- for dirname in [run1_dir, run2_dir, run3_dir]:
- _AddEvents(dirname)
-
- x.AddRun(run1_dir, 'run1')
- x.AddRun(run2_dir, 'run2')
- x.AddRun(run3_dir, 'run3')
-
- x.Reload()
-
- # Delete the directory, then reload.
- shutil.rmtree(run2_dir)
- x.Reload()
- self.assertNotIn('run2', x.Runs().keys())
-
-
-if __name__ == '__main__':
- googletest.main()
diff --git a/tensorflow/python/summary/impl/__init__.py b/tensorflow/python/summary/impl/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
--- a/tensorflow/python/summary/impl/__init__.py
+++ /dev/null
diff --git a/tensorflow/python/summary/impl/directory_watcher.py b/tensorflow/python/summary/impl/directory_watcher.py
deleted file mode 100644
index 799e01a836..0000000000
--- a/tensorflow/python/summary/impl/directory_watcher.py
+++ /dev/null
@@ -1,254 +0,0 @@
-# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ==============================================================================
-
-"""Contains the implementation for the DirectoryWatcher class."""
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-import bisect
-
-from tensorflow.python.framework import errors
-from tensorflow.python.platform import gfile
-from tensorflow.python.platform import tf_logging as logging
-from tensorflow.python.summary.impl import io_wrapper
-
-
-class DirectoryWatcher(object):
- """A DirectoryWatcher wraps a loader to load from a sequence of paths.
-
- A loader reads a path and produces some kind of values as an iterator. A
- DirectoryWatcher takes a directory, a factory for loaders, and optionally a
- path filter and watches all the paths inside that directory.
-
- This class is only valid under the assumption that only one path will be
- written to by the data source at a time and that once the source stops writing
- to a path, it will start writing to a new path that's lexicographically
- greater and never come back. It uses some heuristics to check whether this is
- true based on tracking changes to the files' sizes, but the check can have
- false negatives. However, it should have no false positives.
- """
-
- def __init__(self, directory, loader_factory, path_filter=lambda x: True):
- """Constructs a new DirectoryWatcher.
-
- Args:
- directory: The directory to load files from.
- loader_factory: A factory for creating loaders. The factory should take a
- path and return an object that has a Load method returning an
- iterator that will yield all events that have not been yielded yet.
- path_filter: If specified, only paths matching this filter are loaded.
-
- Raises:
- ValueError: If path_provider or loader_factory are None.
- """
- if directory is None:
- raise ValueError('A directory is required')
- if loader_factory is None:
- raise ValueError('A loader factory is required')
- self._directory = directory
- self._path = None
- self._loader_factory = loader_factory
- self._loader = None
- self._path_filter = path_filter
- self._ooo_writes_detected = False
- # The file size for each file at the time it was finalized.
- self._finalized_sizes = {}
-
- def Load(self):
- """Loads new values.
-
- The watcher will load from one path at a time; as soon as that path stops
- yielding events, it will move on to the next path. We assume that old paths
- are never modified after a newer path has been written. As a result, Load()
- can be called multiple times in a row without losing events that have not
- been yielded yet. In other words, we guarantee that every event will be
- yielded exactly once.
-
- Yields:
- All values that have not been yielded yet.
-
- Raises:
- DirectoryDeletedError: If the directory has been permanently deleted
- (as opposed to being temporarily unavailable).
- """
- try:
- for event in self._LoadInternal():
- yield event
- except errors.OpError:
- if not gfile.Exists(self._directory):
- raise DirectoryDeletedError(
- 'Directory %s has been permanently deleted' % self._directory)
-
- def _LoadInternal(self):
- """Internal implementation of Load().
-
- The only difference between this and Load() is that the latter will throw
- DirectoryDeletedError on I/O errors if it thinks that the directory has been
- permanently deleted.
-
- Yields:
- All values that have not been yielded yet.
- """
-
- # If the loader exists, check it for a value.
- if not self._loader:
- self._InitializeLoader()
-
- while True:
- # Yield all the new events in the path we're currently loading from.
- for event in self._loader.Load():
- yield event
-
- next_path = self._GetNextPath()
- if not next_path:
- logging.info('No path found after %s', self._path)
- # Current path is empty and there are no new paths, so we're done.
- return
-
- # There's a new path, so check to make sure there weren't any events
- # written between when we finished reading the current path and when we
- # checked for the new one. The sequence of events might look something
- # like this:
- #
- # 1. Event #1 written to path #1.
- # 2. We check for events and yield event #1 from path #1
- # 3. We check for events and see that there are no more events in path #1.
- # 4. Event #2 is written to path #1.
- # 5. Event #3 is written to path #2.
- # 6. We check for a new path and see that path #2 exists.
- #
- # Without this loop, we would miss event #2. We're also guaranteed by the
- # loader contract that no more events will be written to path #1 after
- # events start being written to path #2, so we don't have to worry about
- # that.
- for event in self._loader.Load():
- yield event
-
- logging.info('Directory watcher advancing from %s to %s', self._path,
- next_path)
-
- # Advance to the next path and start over.
- self._SetPath(next_path)
-
- # The number of paths before the current one to check for out of order writes.
- _OOO_WRITE_CHECK_COUNT = 20
-
- def OutOfOrderWritesDetected(self):
- """Returns whether any out-of-order writes have been detected.
-
- Out-of-order writes are only checked as part of the Load() iterator. Once an
- out-of-order write is detected, this function will always return true.
-
- Note that out-of-order write detection is not performed on GCS paths, so
- this function will always return false.
-
- Returns:
- Whether any out-of-order write has ever been detected by this watcher.
-
- """
- return self._ooo_writes_detected
-
- def _InitializeLoader(self):
- path = self._GetNextPath()
- if path:
- self._SetPath(path)
- else:
- raise StopIteration
-
- def _SetPath(self, path):
- """Sets the current path to watch for new events.
-
- This also records the size of the old path, if any. If the size can't be
- found, an error is logged.
-
- Args:
- path: The full path of the file to watch.
- """
- old_path = self._path
- if old_path and not io_wrapper.IsGCSPath(old_path):
- try:
- # We're done with the path, so store its size.
- size = gfile.Stat(old_path).length
- logging.debug('Setting latest size of %s to %d', old_path, size)
- self._finalized_sizes[old_path] = size
- except errors.OpError as e:
- logging.error('Unable to get size of %s: %s', old_path, e)
-
- self._path = path
- self._loader = self._loader_factory(path)
-
- def _GetNextPath(self):
- """Gets the next path to load from.
-
- This function also does the checking for out-of-order writes as it iterates
- through the paths.
-
- Returns:
- The next path to load events from, or None if there are no more paths.
- """
- paths = sorted(path
- for path in io_wrapper.ListDirectoryAbsolute(self._directory)
- if self._path_filter(path))
- if not paths:
- return None
-
- if self._path is None:
- return paths[0]
-
- # Don't bother checking if the paths are GCS (which we can't check) or if
- # we've already detected an OOO write.
- if not io_wrapper.IsGCSPath(paths[0]) and not self._ooo_writes_detected:
- # Check the previous _OOO_WRITE_CHECK_COUNT paths for out of order writes.
- current_path_index = bisect.bisect_left(paths, self._path)
- ooo_check_start = max(0, current_path_index - self._OOO_WRITE_CHECK_COUNT)
- for path in paths[ooo_check_start:current_path_index]:
- if self._HasOOOWrite(path):
- self._ooo_writes_detected = True
- break
-
- next_paths = list(path
- for path in paths
- if self._path is None or path > self._path)
- if next_paths:
- return min(next_paths)
- else:
- return None
-
- def _HasOOOWrite(self, path):
- """Returns whether the path has had an out-of-order write."""
- # Check the sizes of each path before the current one.
- size = gfile.Stat(path).length
- old_size = self._finalized_sizes.get(path, None)
- if size != old_size:
- if old_size is None:
- logging.error('File %s created after file %s even though it\'s '
- 'lexicographically earlier', path, self._path)
- else:
- logging.error('File %s updated even though the current file is %s',
- path, self._path)
- return True
- else:
- return False
-
-
-class DirectoryDeletedError(Exception):
- """Thrown by Load() when the directory is *permanently* gone.
-
- We distinguish this from temporary errors so that other code can decide to
- drop all of our data only when a directory has been intentionally deleted,
- as opposed to due to transient filesystem errors.
- """
- pass
diff --git a/tensorflow/python/summary/impl/directory_watcher_test.py b/tensorflow/python/summary/impl/directory_watcher_test.py
deleted file mode 100644
index b6ecc15849..0000000000
--- a/tensorflow/python/summary/impl/directory_watcher_test.py
+++ /dev/null
@@ -1,209 +0,0 @@
-# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ==============================================================================
-
-"""Tests for directory_watcher."""
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-import os
-import shutil
-
-from tensorflow.python.framework import test_util
-from tensorflow.python.platform import gfile
-from tensorflow.python.platform import googletest
-from tensorflow.python.summary.impl import directory_watcher
-from tensorflow.python.summary.impl import io_wrapper
-
-
-class _ByteLoader(object):
- """A loader that loads individual bytes from a file."""
-
- def __init__(self, path):
- self._f = open(path)
- self.bytes_read = 0
-
- def Load(self):
- while True:
- self._f.seek(self.bytes_read)
- byte = self._f.read(1)
- if byte:
- self.bytes_read += 1
- yield byte
- else:
- return
-
-
-class DirectoryWatcherTest(test_util.TensorFlowTestCase):
-
- def setUp(self):
- # Put everything in a directory so it's easier to delete.
- self._directory = os.path.join(self.get_temp_dir(), 'monitor_dir')
- os.mkdir(self._directory)
- self._watcher = directory_watcher.DirectoryWatcher(self._directory,
- _ByteLoader)
- self.stubs = googletest.StubOutForTesting()
-
- def tearDown(self):
- self.stubs.CleanUp()
- try:
- shutil.rmtree(self._directory)
- except OSError:
- # Some tests delete the directory.
- pass
-
- def _WriteToFile(self, filename, data):
- path = os.path.join(self._directory, filename)
- with open(path, 'a') as f:
- f.write(data)
-
- def _LoadAllEvents(self):
- """Loads all events in the watcher."""
- for _ in self._watcher.Load():
- pass
-
- def assertWatcherYields(self, values):
- self.assertEqual(list(self._watcher.Load()), values)
-
- def testRaisesWithBadArguments(self):
- with self.assertRaises(ValueError):
- directory_watcher.DirectoryWatcher(None, lambda x: None)
- with self.assertRaises(ValueError):
- directory_watcher.DirectoryWatcher('dir', None)
-
- def testEmptyDirectory(self):
- self.assertWatcherYields([])
-
- def testSingleWrite(self):
- self._WriteToFile('a', 'abc')
- self.assertWatcherYields(['a', 'b', 'c'])
- self.assertFalse(self._watcher.OutOfOrderWritesDetected())
-
- def testMultipleWrites(self):
- self._WriteToFile('a', 'abc')
- self.assertWatcherYields(['a', 'b', 'c'])
- self._WriteToFile('a', 'xyz')
- self.assertWatcherYields(['x', 'y', 'z'])
- self.assertFalse(self._watcher.OutOfOrderWritesDetected())
-
- def testMultipleLoads(self):
- self._WriteToFile('a', 'a')
- self._watcher.Load()
- self._watcher.Load()
- self.assertWatcherYields(['a'])
- self.assertFalse(self._watcher.OutOfOrderWritesDetected())
-
- def testMultipleFilesAtOnce(self):
- self._WriteToFile('b', 'b')
- self._WriteToFile('a', 'a')
- self.assertWatcherYields(['a', 'b'])
- self.assertFalse(self._watcher.OutOfOrderWritesDetected())
-
- def testFinishesLoadingFileWhenSwitchingToNewFile(self):
- self._WriteToFile('a', 'a')
- # Empty the iterator.
- self.assertEquals(['a'], list(self._watcher.Load()))
- self._WriteToFile('a', 'b')
- self._WriteToFile('b', 'c')
- # The watcher should finish its current file before starting a new one.
- self.assertWatcherYields(['b', 'c'])
- self.assertFalse(self._watcher.OutOfOrderWritesDetected())
-
- def testIntermediateEmptyFiles(self):
- self._WriteToFile('a', 'a')
- self._WriteToFile('b', '')
- self._WriteToFile('c', 'c')
- self.assertWatcherYields(['a', 'c'])
- self.assertFalse(self._watcher.OutOfOrderWritesDetected())
-
- def testPathFilter(self):
- self._watcher = directory_watcher.DirectoryWatcher(
- self._directory, _ByteLoader,
- lambda path: 'do_not_watch_me' not in path)
-
- self._WriteToFile('a', 'a')
- self._WriteToFile('do_not_watch_me', 'b')
- self._WriteToFile('c', 'c')
- self.assertWatcherYields(['a', 'c'])
- self.assertFalse(self._watcher.OutOfOrderWritesDetected())
-
- def testDetectsNewOldFiles(self):
- self._WriteToFile('b', 'a')
- self._LoadAllEvents()
- self._WriteToFile('a', 'a')
- self._LoadAllEvents()
- self.assertTrue(self._watcher.OutOfOrderWritesDetected())
-
- def testIgnoresNewerFiles(self):
- self._WriteToFile('a', 'a')
- self._LoadAllEvents()
- self._WriteToFile('q', 'a')
- self._LoadAllEvents()
- self.assertFalse(self._watcher.OutOfOrderWritesDetected())
-
- def testDetectsChangingOldFiles(self):
- self._WriteToFile('a', 'a')
- self._WriteToFile('b', 'a')
- self._LoadAllEvents()
- self._WriteToFile('a', 'c')
- self._LoadAllEvents()
- self.assertTrue(self._watcher.OutOfOrderWritesDetected())
-
- def testDoesntCrashWhenFileIsDeleted(self):
- self._WriteToFile('a', 'a')
- self._LoadAllEvents()
- os.remove(os.path.join(self._directory, 'a'))
- self._WriteToFile('b', 'b')
- self.assertWatcherYields(['b'])
-
- def testRaisesRightErrorWhenDirectoryIsDeleted(self):
- self._WriteToFile('a', 'a')
- self._LoadAllEvents()
- shutil.rmtree(self._directory)
- with self.assertRaises(directory_watcher.DirectoryDeletedError):
- self._LoadAllEvents()
-
- def testDoesntRaiseDirectoryDeletedErrorIfOutageIsTransient(self):
- self._WriteToFile('a', 'a')
- self._LoadAllEvents()
- shutil.rmtree(self._directory)
-
- # Fake a single transient I/O error.
- def FakeFactory(original):
-
- def Fake(*args, **kwargs):
- if FakeFactory.has_been_called:
- original(*args, **kwargs)
- else:
- raise OSError('lp0 temporarily on fire')
-
- return Fake
-
- FakeFactory.has_been_called = False
-
- for stub_name in ['ListDirectoryAbsolute', 'ListRecursively']:
- self.stubs.Set(io_wrapper, stub_name,
- FakeFactory(getattr(io_wrapper, stub_name)))
- for stub_name in ['IsDirectory', 'Exists', 'Stat']:
- self.stubs.Set(gfile, stub_name,
- FakeFactory(getattr(gfile, stub_name)))
-
- with self.assertRaises((IOError, OSError)):
- self._LoadAllEvents()
-
-
-if __name__ == '__main__':
- googletest.main()
diff --git a/tensorflow/python/summary/impl/event_file_loader.py b/tensorflow/python/summary/impl/event_file_loader.py
deleted file mode 100644
index ccc61d4564..0000000000
--- a/tensorflow/python/summary/impl/event_file_loader.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ==============================================================================
-
-"""Functionality for loading events from a record file."""
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-from tensorflow.core.util import event_pb2
-from tensorflow.python import pywrap_tensorflow
-from tensorflow.python.framework import errors
-from tensorflow.python.platform import app
-from tensorflow.python.platform import resource_loader
-from tensorflow.python.platform import tf_logging as logging
-from tensorflow.python.util import compat
-
-
-class EventFileLoader(object):
- """An EventLoader is an iterator that yields Event protos."""
-
- def __init__(self, file_path):
- if file_path is None:
- raise ValueError('A file path is required')
- file_path = resource_loader.readahead_file_path(file_path)
- logging.debug('Opening a record reader pointing at %s', file_path)
- with errors.raise_exception_on_not_ok_status() as status:
- self._reader = pywrap_tensorflow.PyRecordReader_New(
- compat.as_bytes(file_path), 0, compat.as_bytes(''), status)
- # Store it for logging purposes.
- self._file_path = file_path
- if not self._reader:
- raise IOError('Failed to open a record reader pointing to %s' % file_path)
-
- def Load(self):
- """Loads all new values from disk.
-
- Calling Load multiple times in a row will not 'drop' events as long as the
- return value is not iterated over.
-
- Yields:
- All values that were written to disk that have not been yielded yet.
- """
- while True:
- try:
- with errors.raise_exception_on_not_ok_status() as status:
- self._reader.GetNext(status)
- except (errors.DataLossError, errors.OutOfRangeError):
- # We ignore partial read exceptions, because a record may be truncated.
- # PyRecordReader holds the offset prior to the failed read, so retrying
- # will succeed.
- break
- event = event_pb2.Event()
- event.ParseFromString(self._reader.record())
- yield event
- logging.debug('No more events in %s', self._file_path)
-
-
-def main(argv):
- if len(argv) != 2:
- print('Usage: event_file_loader <path-to-the-recordio-file>')
- return 1
- loader = EventFileLoader(argv[1])
- for event in loader.Load():
- print(event)
-
-
-if __name__ == '__main__':
- app.run()
diff --git a/tensorflow/python/summary/impl/event_file_loader_test.py b/tensorflow/python/summary/impl/event_file_loader_test.py
deleted file mode 100644
index 0b354d553d..0000000000
--- a/tensorflow/python/summary/impl/event_file_loader_test.py
+++ /dev/null
@@ -1,92 +0,0 @@
-# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ==============================================================================
-
-"""Tests for event_file_loader."""
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-import os
-import tempfile
-
-from tensorflow.python.framework import test_util
-from tensorflow.python.platform import googletest
-from tensorflow.python.summary.impl import event_file_loader
-
-
-class EventFileLoaderTest(test_util.TensorFlowTestCase):
- # A record containing a simple event.
- RECORD = (b'\x18\x00\x00\x00\x00\x00\x00\x00\xa3\x7fK"\t\x00\x00\xc0%\xddu'
- b'\xd5A\x1a\rbrain.Event:1\xec\xf32\x8d')
-
- def _WriteToFile(self, filename, data):
- with open(filename, 'ab') as f:
- f.write(data)
-
- def _LoaderForTestFile(self, filename):
- return event_file_loader.EventFileLoader(
- os.path.join(self.get_temp_dir(), filename))
-
- def testEmptyEventFile(self):
- filename = tempfile.NamedTemporaryFile(dir=self.get_temp_dir()).name
- self._WriteToFile(filename, b'')
- loader = self._LoaderForTestFile(filename)
- self.assertEqual(len(list(loader.Load())), 0)
-
- def testSingleWrite(self):
- filename = tempfile.NamedTemporaryFile(dir=self.get_temp_dir()).name
- self._WriteToFile(filename, EventFileLoaderTest.RECORD)
- loader = self._LoaderForTestFile(filename)
- events = list(loader.Load())
- self.assertEqual(len(events), 1)
- self.assertEqual(events[0].wall_time, 1440183447.0)
- self.assertEqual(len(list(loader.Load())), 0)
-
- def testMultipleWrites(self):
- filename = tempfile.NamedTemporaryFile(dir=self.get_temp_dir()).name
- self._WriteToFile(filename, EventFileLoaderTest.RECORD)
- loader = self._LoaderForTestFile(filename)
- self.assertEqual(len(list(loader.Load())), 1)
- self._WriteToFile(filename, EventFileLoaderTest.RECORD)
- self.assertEqual(len(list(loader.Load())), 1)
-
- def testMultipleLoads(self):
- filename = tempfile.NamedTemporaryFile(dir=self.get_temp_dir()).name
- self._WriteToFile(filename, EventFileLoaderTest.RECORD)
- loader = self._LoaderForTestFile(filename)
- loader.Load()
- loader.Load()
- self.assertEqual(len(list(loader.Load())), 1)
-
- def testMultipleWritesAtOnce(self):
- filename = tempfile.NamedTemporaryFile(dir=self.get_temp_dir()).name
- self._WriteToFile(filename, EventFileLoaderTest.RECORD)
- self._WriteToFile(filename, EventFileLoaderTest.RECORD)
- loader = self._LoaderForTestFile(filename)
- self.assertEqual(len(list(loader.Load())), 2)
-
- def testMultipleWritesWithBadWrite(self):
- filename = tempfile.NamedTemporaryFile(dir=self.get_temp_dir()).name
- self._WriteToFile(filename, EventFileLoaderTest.RECORD)
- self._WriteToFile(filename, EventFileLoaderTest.RECORD)
- # Test that we ignore partial record writes at the end of the file.
- self._WriteToFile(filename, b'123')
- loader = self._LoaderForTestFile(filename)
- self.assertEqual(len(list(loader.Load())), 2)
-
-
-if __name__ == '__main__':
- googletest.main()
diff --git a/tensorflow/python/summary/impl/io_wrapper.py b/tensorflow/python/summary/impl/io_wrapper.py
deleted file mode 100644
index 258fe8c804..0000000000
--- a/tensorflow/python/summary/impl/io_wrapper.py
+++ /dev/null
@@ -1,52 +0,0 @@
-# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ==============================================================================
-"""IO helper functions."""
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-import os
-
-from tensorflow.python.platform import gfile
-
-
-def IsGCSPath(path):
- return path.startswith("gs://")
-
-
-def ListDirectoryAbsolute(directory):
- """Yields all files in the given directory. The paths are absolute."""
- return (os.path.join(directory, path)
- for path in gfile.ListDirectory(directory))
-
-
-def ListRecursively(top):
- """Walks a directory tree, yielding (dir_path, file_paths) tuples.
-
- For each of `top` and its subdirectories, yields a tuple containing the path
- to the directory and the path to each of the contained files. Note that
- unlike os.Walk()/gfile.Walk(), this does not list subdirectories and the file
- paths are all absolute.
-
- If the directory does not exist, this yields nothing.
-
- Args:
- top: A path to a directory..
- Yields:
- A list of (dir_path, file_paths) tuples.
- """
- for dir_path, _, filenames in gfile.Walk(top):
- yield (dir_path, (os.path.join(dir_path, filename)
- for filename in filenames))
diff --git a/tensorflow/python/summary/impl/reservoir.py b/tensorflow/python/summary/impl/reservoir.py
deleted file mode 100644
index 0a1252e635..0000000000
--- a/tensorflow/python/summary/impl/reservoir.py
+++ /dev/null
@@ -1,253 +0,0 @@
-# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ==============================================================================
-
-"""A key-value[] store that implements reservoir sampling on the values."""
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-import collections
-import random
-import threading
-
-
-class Reservoir(object):
- """A map-to-arrays container, with deterministic Reservoir Sampling.
-
- Items are added with an associated key. Items may be retrieved by key, and
- a list of keys can also be retrieved. If size is not zero, then it dictates
- the maximum number of items that will be stored with each key. Once there are
- more items for a given key, they are replaced via reservoir sampling, such
- that each item has an equal probability of being included in the sample.
-
- Deterministic means that for any given seed and bucket size, the sequence of
- values that are kept for any given tag will always be the same, and that this
- is independent of any insertions on other tags. That is:
-
- >>> separate_reservoir = reservoir.Reservoir(10)
- >>> interleaved_reservoir = reservoir.Reservoir(10)
- >>> for i in xrange(100):
- >>> separate_reservoir.AddItem('key1', i)
- >>> for i in xrange(100):
- >>> separate_reservoir.AddItem('key2', i)
- >>> for i in xrange(100):
- >>> interleaved_reservoir.AddItem('key1', i)
- >>> interleaved_reservoir.AddItem('key2', i)
-
- separate_reservoir and interleaved_reservoir will be in identical states.
-
- See: https://en.wikipedia.org/wiki/Reservoir_sampling
-
- Adding items has amortized O(1) runtime.
-
- """
-
- def __init__(self, size, seed=0, always_keep_last=True):
- """Creates a new reservoir.
-
- Args:
- size: The number of values to keep in the reservoir for each tag. If 0,
- all values will be kept.
- seed: The seed of the random number generator to use when sampling.
- Different values for |seed| will produce different samples from the same
- input items.
- always_keep_last: Whether to always keep the latest seen item in the
- end of the reservoir. Defaults to True.
-
- Raises:
- ValueError: If size is negative or not an integer.
- """
- if size < 0 or size != round(size):
- raise ValueError('size must be nonegative integer, was %s' % size)
- self._buckets = collections.defaultdict(
- lambda: _ReservoirBucket(size, random.Random(seed), always_keep_last))
- # _mutex guards the keys - creating new keys, retrieving by key, etc
- # the internal items are guarded by the ReservoirBuckets' internal mutexes
- self._mutex = threading.Lock()
-
- def Keys(self):
- """Return all the keys in the reservoir.
-
- Returns:
- ['list', 'of', 'keys'] in the Reservoir.
- """
- with self._mutex:
- return list(self._buckets.keys())
-
- def Items(self, key):
- """Return items associated with given key.
-
- Args:
- key: The key for which we are finding associated items.
-
- Raises:
- KeyError: If the key is not found in the reservoir.
-
- Returns:
- [list, of, items] associated with that key.
- """
- with self._mutex:
- if key not in self._buckets:
- raise KeyError('Key %s was not found in Reservoir' % key)
- bucket = self._buckets[key]
- return bucket.Items()
-
- def AddItem(self, key, item, f=lambda x: x):
- """Add a new item to the Reservoir with the given tag.
-
- If the reservoir has not yet reached full size, the new item is guaranteed
- to be added. If the reservoir is full, then behavior depends on the
- always_keep_last boolean.
-
- If always_keep_last was set to true, the new item is guaranteed to be added
- to the reservoir, and either the previous last item will be replaced, or
- (with low probability) an older item will be replaced.
-
- If always_keep_last was set to false, then the new item will replace an
- old item with low probability.
-
- If f is provided, it will be applied to transform item (lazily, iff item is
- going to be included in the reservoir).
-
- Args:
- key: The key to store the item under.
- item: The item to add to the reservoir.
- f: An optional function to transform the item prior to addition.
- """
- with self._mutex:
- bucket = self._buckets[key]
- bucket.AddItem(item, f)
-
- def FilterItems(self, filterFn, key=None):
- """Filter items within a Reservoir, using a filtering function.
-
- Args:
- filterFn: A function that returns True for the items to be kept.
- key: An optional bucket key to filter. If not specified, will filter all
- all buckets.
-
- Returns:
- The number of items removed.
- """
- with self._mutex:
- if key:
- if key in self._buckets:
- return self._buckets[key].FilterItems(filterFn)
- else:
- return 0
- else:
- return sum(bucket.FilterItems(filterFn)
- for bucket in self._buckets.values())
-
-
-class _ReservoirBucket(object):
- """A container for items from a stream, that implements reservoir sampling.
-
- It always stores the most recent item as its final item.
- """
-
- def __init__(self, _max_size, _random=None, always_keep_last=True):
- """Create the _ReservoirBucket.
-
- Args:
- _max_size: The maximum size the reservoir bucket may grow to. If size is
- zero, the bucket has unbounded size.
- _random: The random number generator to use. If not specified, defaults to
- random.Random(0).
- always_keep_last: Whether the latest seen item should always be included
- in the end of the bucket.
-
- Raises:
- ValueError: if the size is not a nonnegative integer.
- """
- if _max_size < 0 or _max_size != round(_max_size):
- raise ValueError('_max_size must be nonegative int, was %s' % _max_size)
- self.items = []
- # This mutex protects the internal items, ensuring that calls to Items and
- # AddItem are thread-safe
- self._mutex = threading.Lock()
- self._max_size = _max_size
- self._num_items_seen = 0
- if _random is not None:
- self._random = _random
- else:
- self._random = random.Random(0)
- self.always_keep_last = always_keep_last
-
- def AddItem(self, item, f=lambda x: x):
- """Add an item to the ReservoirBucket, replacing an old item if necessary.
-
- The new item is guaranteed to be added to the bucket, and to be the last
- element in the bucket. If the bucket has reached capacity, then an old item
- will be replaced. With probability (_max_size/_num_items_seen) a random item
- in the bucket will be popped out and the new item will be appended
- to the end. With probability (1 - _max_size/_num_items_seen)
- the last item in the bucket will be replaced.
-
- Since the O(n) replacements occur with O(1/_num_items_seen) likelihood,
- the amortized runtime is O(1).
-
- Args:
- item: The item to add to the bucket.
- f: A function to transform item before addition, if it will be kept in
- the reservoir.
- """
- with self._mutex:
- if len(self.items) < self._max_size or self._max_size == 0:
- self.items.append(f(item))
- else:
- r = self._random.randint(0, self._num_items_seen)
- if r < self._max_size:
- self.items.pop(r)
- self.items.append(f(item))
- elif self.always_keep_last:
- self.items[-1] = f(item)
- self._num_items_seen += 1
-
- def FilterItems(self, filterFn):
- """Filter items in a ReservoirBucket, using a filtering function.
-
- Filtering items from the reservoir bucket must update the
- internal state variable self._num_items_seen, which is used for determining
- the rate of replacement in reservoir sampling. Ideally, self._num_items_seen
- would contain the exact number of items that have ever seen by the
- ReservoirBucket and satisfy filterFn. However, the ReservoirBucket does not
- have access to all items seen -- it only has access to the subset of items
- that have survived sampling (self.items). Therefore, we estimate
- self._num_items_seen by scaling it by the same ratio as the ratio of items
- not removed from self.items.
-
- Args:
- filterFn: A function that returns True for items to be kept.
-
- Returns:
- The number of items removed from the bucket.
- """
- with self._mutex:
- size_before = len(self.items)
- self.items = list(filter(filterFn, self.items))
- size_diff = size_before - len(self.items)
-
- # Estimate a correction the number of items seen
- prop_remaining = len(self.items) / float(
- size_before) if size_before > 0 else 0
- self._num_items_seen = int(round(self._num_items_seen * prop_remaining))
- return size_diff
-
- def Items(self):
- """Get all the items in the bucket."""
- with self._mutex:
- return list(self.items)
diff --git a/tensorflow/python/summary/impl/reservoir_test.py b/tensorflow/python/summary/impl/reservoir_test.py
deleted file mode 100644
index c526d64f3d..0000000000
--- a/tensorflow/python/summary/impl/reservoir_test.py
+++ /dev/null
@@ -1,279 +0,0 @@
-# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ==============================================================================
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-
-from six.moves import xrange # pylint: disable=redefined-builtin
-
-from tensorflow.python.platform import test
-from tensorflow.python.summary.impl import reservoir
-
-
-class ReservoirTest(test.TestCase):
-
- def testEmptyReservoir(self):
- r = reservoir.Reservoir(1)
- self.assertFalse(r.Keys())
-
- def testRespectsSize(self):
- r = reservoir.Reservoir(42)
- self.assertEqual(r._buckets['meaning of life']._max_size, 42)
-
- def testItemsAndKeys(self):
- r = reservoir.Reservoir(42)
- r.AddItem('foo', 4)
- r.AddItem('bar', 9)
- r.AddItem('foo', 19)
- self.assertItemsEqual(r.Keys(), ['foo', 'bar'])
- self.assertEqual(r.Items('foo'), [4, 19])
- self.assertEqual(r.Items('bar'), [9])
-
- def testExceptions(self):
- with self.assertRaises(ValueError):
- reservoir.Reservoir(-1)
- with self.assertRaises(ValueError):
- reservoir.Reservoir(13.3)
-
- r = reservoir.Reservoir(12)
- with self.assertRaises(KeyError):
- r.Items('missing key')
-
- def testDeterminism(self):
- """Tests that the reservoir is deterministic."""
- key = 'key'
- r1 = reservoir.Reservoir(10)
- r2 = reservoir.Reservoir(10)
- for i in xrange(100):
- r1.AddItem('key', i)
- r2.AddItem('key', i)
-
- self.assertEqual(r1.Items(key), r2.Items(key))
-
- def testBucketDeterminism(self):
- """Tests that reservoirs are deterministic at a bucket level.
-
- This means that only the order elements are added within a bucket matters.
- """
- separate_reservoir = reservoir.Reservoir(10)
- interleaved_reservoir = reservoir.Reservoir(10)
- for i in xrange(100):
- separate_reservoir.AddItem('key1', i)
- for i in xrange(100):
- separate_reservoir.AddItem('key2', i)
- for i in xrange(100):
- interleaved_reservoir.AddItem('key1', i)
- interleaved_reservoir.AddItem('key2', i)
-
- for key in ['key1', 'key2']:
- self.assertEqual(
- separate_reservoir.Items(key), interleaved_reservoir.Items(key))
-
- def testUsesSeed(self):
- """Tests that reservoirs with different seeds keep different samples."""
- key = 'key'
- r1 = reservoir.Reservoir(10, seed=0)
- r2 = reservoir.Reservoir(10, seed=1)
- for i in xrange(100):
- r1.AddItem('key', i)
- r2.AddItem('key', i)
- self.assertNotEqual(r1.Items(key), r2.Items(key))
-
- def testFilterItemsByKey(self):
- r = reservoir.Reservoir(100, seed=0)
- for i in xrange(10):
- r.AddItem('key1', i)
- r.AddItem('key2', i)
-
- self.assertEqual(len(r.Items('key1')), 10)
- self.assertEqual(len(r.Items('key2')), 10)
-
- self.assertEqual(r.FilterItems(lambda x: x <= 7, 'key2'), 2)
- self.assertEqual(len(r.Items('key2')), 8)
- self.assertEqual(len(r.Items('key1')), 10)
-
- self.assertEqual(r.FilterItems(lambda x: x <= 3, 'key1'), 6)
- self.assertEqual(len(r.Items('key1')), 4)
- self.assertEqual(len(r.Items('key2')), 8)
-
-
-class ReservoirBucketTest(test.TestCase):
-
- def testEmptyBucket(self):
- b = reservoir._ReservoirBucket(1)
- self.assertFalse(b.Items())
-
- def testFillToSize(self):
- b = reservoir._ReservoirBucket(100)
- for i in xrange(100):
- b.AddItem(i)
- self.assertEqual(b.Items(), list(xrange(100)))
- self.assertEqual(b._num_items_seen, 100)
-
- def testDoesntOverfill(self):
- b = reservoir._ReservoirBucket(10)
- for i in xrange(1000):
- b.AddItem(i)
- self.assertEqual(len(b.Items()), 10)
- self.assertEqual(b._num_items_seen, 1000)
-
- def testMaintainsOrder(self):
- b = reservoir._ReservoirBucket(100)
- for i in xrange(10000):
- b.AddItem(i)
- items = b.Items()
- prev = -1
- for item in items:
- self.assertTrue(item > prev)
- prev = item
-
- def testKeepsLatestItem(self):
- b = reservoir._ReservoirBucket(5)
- for i in xrange(100):
- b.AddItem(i)
- last = b.Items()[-1]
- self.assertEqual(last, i)
-
- def testSizeOneBucket(self):
- b = reservoir._ReservoirBucket(1)
- for i in xrange(20):
- b.AddItem(i)
- self.assertEqual(b.Items(), [i])
- self.assertEqual(b._num_items_seen, 20)
-
- def testSizeZeroBucket(self):
- b = reservoir._ReservoirBucket(0)
- for i in xrange(20):
- b.AddItem(i)
- self.assertEqual(b.Items(), list(range(i + 1)))
- self.assertEqual(b._num_items_seen, 20)
-
- def testSizeRequirement(self):
- with self.assertRaises(ValueError):
- reservoir._ReservoirBucket(-1)
- with self.assertRaises(ValueError):
- reservoir._ReservoirBucket(10.3)
-
- def testRemovesItems(self):
- b = reservoir._ReservoirBucket(100)
- for i in xrange(10):
- b.AddItem(i)
- self.assertEqual(len(b.Items()), 10)
- self.assertEqual(b._num_items_seen, 10)
- self.assertEqual(b.FilterItems(lambda x: x <= 7), 2)
- self.assertEqual(len(b.Items()), 8)
- self.assertEqual(b._num_items_seen, 8)
-
- def testRemovesItemsWhenItemsAreReplaced(self):
- b = reservoir._ReservoirBucket(100)
- for i in xrange(10000):
- b.AddItem(i)
- self.assertEqual(b._num_items_seen, 10000)
-
- # Remove items
- num_removed = b.FilterItems(lambda x: x <= 7)
- self.assertGreater(num_removed, 92)
- self.assertEqual([], [item for item in b.Items() if item > 7])
- self.assertEqual(b._num_items_seen,
- int(round(10000 * (1 - float(num_removed) / 100))))
-
- def testLazyFunctionEvaluationAndAlwaysKeepLast(self):
-
- class FakeRandom(object):
-
- def randint(self, a, b): # pylint:disable=unused-argument
- return 999
-
- class Incrementer(object):
-
- def __init__(self):
- self.n = 0
-
- def increment_and_double(self, x):
- self.n += 1
- return x * 2
-
- # We've mocked the randomness generator, so that once it is full, the last
- # item will never get durable reservoir inclusion. Since always_keep_last is
- # false, the function should only get invoked 100 times while filling up
- # the reservoir. This laziness property is an essential performance
- # optimization.
- b = reservoir._ReservoirBucket(100, FakeRandom(), always_keep_last=False)
- incrementer = Incrementer()
- for i in xrange(1000):
- b.AddItem(i, incrementer.increment_and_double)
- self.assertEqual(incrementer.n, 100)
- self.assertEqual(b.Items(), [x * 2 for x in xrange(100)])
-
- # This time, we will always keep the last item, meaning that the function
- # should get invoked once for every item we add.
- b = reservoir._ReservoirBucket(100, FakeRandom(), always_keep_last=True)
- incrementer = Incrementer()
-
- for i in xrange(1000):
- b.AddItem(i, incrementer.increment_and_double)
- self.assertEqual(incrementer.n, 1000)
- self.assertEqual(b.Items(), [x * 2 for x in xrange(99)] + [999 * 2])
-
-
-class ReservoirBucketStatisticalDistributionTest(test.TestCase):
-
- def setUp(self):
- self.total = 1000000
- self.samples = 10000
- self.n_buckets = 100
- self.total_per_bucket = self.total // self.n_buckets
- self.assertEqual(self.total % self.n_buckets, 0, 'total must be evenly '
- 'divisible by the number of buckets')
- self.assertTrue(self.total > self.samples, 'need to have more items '
- 'than samples')
-
- def AssertBinomialQuantity(self, measured):
- p = 1.0 * self.n_buckets / self.samples
- mean = p * self.samples
- variance = p * (1 - p) * self.samples
- error = measured - mean
- # Given that the buckets were actually binomially distributed, this
- # fails with probability ~2E-9
- passed = error * error <= 36.0 * variance
- self.assertTrue(passed, 'found a bucket with measured %d '
- 'too far from expected %d' % (measured, mean))
-
- def testBucketReservoirSamplingViaStatisticalProperties(self):
- # Not related to a 'ReservoirBucket', but instead number of buckets we put
- # samples into for testing the shape of the distribution
- b = reservoir._ReservoirBucket(_max_size=self.samples)
- # add one extra item because we always keep the most recent item, which
- # would skew the distribution; we can just slice it off the end instead.
- for i in xrange(self.total + 1):
- b.AddItem(i)
-
- divbins = [0] * self.n_buckets
- modbins = [0] * self.n_buckets
- # Slice off the last item when we iterate.
- for item in b.Items()[0:-1]:
- divbins[item // self.total_per_bucket] += 1
- modbins[item % self.n_buckets] += 1
-
- for bucket_index in xrange(self.n_buckets):
- divbin = divbins[bucket_index]
- modbin = modbins[bucket_index]
- self.AssertBinomialQuantity(divbin)
- self.AssertBinomialQuantity(modbin)
-
-
-if __name__ == '__main__':
- test.main()