diff options
author | Dandelion Mané <dandelion@google.com> | 2017-03-08 15:02:19 -0800 |
---|---|---|
committer | TensorFlower Gardener <gardener@tensorflow.org> | 2017-03-08 15:29:33 -0800 |
commit | 8c1c861ccc488497ad44bb8ec7b1b49ff5ef0a2c (patch) | |
tree | 74a2ef4f0efbd1b0897aa03b65d29666e2777ba8 /tensorflow/python/summary | |
parent | 0b044236797eb3b2adb437c4f82d0897bceb554e (diff) |
Move TensorBoard backend logic from python/summary to tensorboard/backend.
The EventMultiplexer and EventAccumulator classes are fairly tightly tied to TensorBoard, so we should migrate them to the tensorboard/ folder in preparation for extracting TensorBoard from the TensorFlow repo.
Change: 149587771
Diffstat (limited to 'tensorflow/python/summary')
-rw-r--r-- | tensorflow/python/summary/event_accumulator.py | 812 | ||||
-rw-r--r-- | tensorflow/python/summary/event_accumulator_test.py | 980 | ||||
-rw-r--r-- | tensorflow/python/summary/event_file_inspector.py | 427 | ||||
-rw-r--r-- | tensorflow/python/summary/event_file_inspector_test.py | 172 | ||||
-rw-r--r-- | tensorflow/python/summary/event_multiplexer.py | 428 | ||||
-rw-r--r-- | tensorflow/python/summary/event_multiplexer_test.py | 319 | ||||
-rw-r--r-- | tensorflow/python/summary/impl/__init__.py | 0 | ||||
-rw-r--r-- | tensorflow/python/summary/impl/directory_watcher.py | 254 | ||||
-rw-r--r-- | tensorflow/python/summary/impl/directory_watcher_test.py | 209 | ||||
-rw-r--r-- | tensorflow/python/summary/impl/event_file_loader.py | 80 | ||||
-rw-r--r-- | tensorflow/python/summary/impl/event_file_loader_test.py | 92 | ||||
-rw-r--r-- | tensorflow/python/summary/impl/io_wrapper.py | 52 | ||||
-rw-r--r-- | tensorflow/python/summary/impl/reservoir.py | 253 | ||||
-rw-r--r-- | tensorflow/python/summary/impl/reservoir_test.py | 279 |
14 files changed, 0 insertions, 4357 deletions
diff --git a/tensorflow/python/summary/event_accumulator.py b/tensorflow/python/summary/event_accumulator.py deleted file mode 100644 index 23408705bd..0000000000 --- a/tensorflow/python/summary/event_accumulator.py +++ /dev/null @@ -1,812 +0,0 @@ -# Copyright 2015 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -"""Takes a generator of values, and accumulates them for a frontend.""" -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import collections -import os -import re -import threading - -import numpy as np - -from tensorflow.core.framework import graph_pb2 -from tensorflow.core.protobuf import meta_graph_pb2 -from tensorflow.core.protobuf.config_pb2 import RunMetadata -from tensorflow.core.util.event_pb2 import SessionLog -from tensorflow.python.platform import tf_logging as logging -from tensorflow.python.summary.impl import directory_watcher -from tensorflow.python.summary.impl import event_file_loader -from tensorflow.python.summary.impl import reservoir -from tensorflow.python.util import compat - -namedtuple = collections.namedtuple -ScalarEvent = namedtuple('ScalarEvent', ['wall_time', 'step', 'value']) - -HealthPillEvent = namedtuple( - 'HealthPillEvent', - ['wall_time', 'step', 'node_name', 'output_slot', 'value']) - -CompressedHistogramEvent = namedtuple('CompressedHistogramEvent', - ['wall_time', 'step', - 'compressed_histogram_values']) - -CompressedHistogramValue = namedtuple('CompressedHistogramValue', - ['basis_point', 'value']) - -HistogramEvent = namedtuple('HistogramEvent', - ['wall_time', 'step', 'histogram_value']) - -HistogramValue = namedtuple('HistogramValue', ['min', 'max', 'num', 'sum', - 'sum_squares', 'bucket_limit', - 'bucket']) - -ImageEvent = namedtuple('ImageEvent', ['wall_time', 'step', - 'encoded_image_string', 'width', - 'height']) - -AudioEvent = namedtuple('AudioEvent', ['wall_time', 'step', - 'encoded_audio_string', 'content_type', - 'sample_rate', 'length_frames']) - -TensorEvent = namedtuple('TensorEvent', ['wall_time', 'step', 'tensor_proto']) - -## Different types of summary events handled by the event_accumulator -SUMMARY_TYPES = { - 'simple_value': '_ProcessScalar', - 'histo': '_ProcessHistogram', - 'image': '_ProcessImage', - 'audio': '_ProcessAudio', - 'tensor': '_ProcessTensor', -} - -## The tagTypes below are just arbitrary strings chosen to pass the type -## information of the tag from the backend to the frontend -COMPRESSED_HISTOGRAMS = 'compressedHistograms' -HISTOGRAMS = 'histograms' -IMAGES = 'images' -AUDIO = 'audio' -SCALARS = 'scalars' -TENSORS = 'tensors' -HEALTH_PILLS = 'health_pills' -GRAPH = 'graph' -META_GRAPH = 'meta_graph' -RUN_METADATA = 'run_metadata' - -## Normal CDF for std_devs: (-Inf, -1.5, -1, -0.5, 0, 0.5, 1, 1.5, Inf) -## naturally gives bands around median of width 1 std dev, 2 std dev, 3 std dev, -## and then the long tail. -NORMAL_HISTOGRAM_BPS = (0, 668, 1587, 3085, 5000, 6915, 8413, 9332, 10000) - -DEFAULT_SIZE_GUIDANCE = { - COMPRESSED_HISTOGRAMS: 500, - IMAGES: 4, - AUDIO: 4, - SCALARS: 10000, - # We store this many health pills per op. - HEALTH_PILLS: 100, - HISTOGRAMS: 1, - TENSORS: 10, -} - -STORE_EVERYTHING_SIZE_GUIDANCE = { - COMPRESSED_HISTOGRAMS: 0, - IMAGES: 0, - AUDIO: 0, - SCALARS: 0, - HEALTH_PILLS: 0, - HISTOGRAMS: 0, - TENSORS: 0, -} - -# The tag that values containing health pills have. Health pill data is stored -# in tensors. In order to distinguish health pill values from scalar values, we -# rely on how health pill values have this special tag value. -_HEALTH_PILL_EVENT_TAG = '__health_pill__' - - -def IsTensorFlowEventsFile(path): - """Check the path name to see if it is probably a TF Events file. - - Args: - path: A file path to check if it is an event file. - - Raises: - ValueError: If the path is an empty string. - - Returns: - If path is formatted like a TensorFlowEventsFile. - """ - if not path: - raise ValueError('Path must be a nonempty string') - return 'tfevents' in compat.as_str_any(os.path.basename(path)) - - -class EventAccumulator(object): - """An `EventAccumulator` takes an event generator, and accumulates the values. - - The `EventAccumulator` is intended to provide a convenient Python interface - for loading Event data written during a TensorFlow run. TensorFlow writes out - `Event` protobuf objects, which have a timestamp and step number, and often - contain a `Summary`. Summaries can have different kinds of data like an image, - a scalar value, or a histogram. The Summaries also have a tag, which we use to - organize logically related data. The `EventAccumulator` supports retrieving - the `Event` and `Summary` data by its tag. - - Calling `Tags()` gets a map from `tagType` (e.g. `'images'`, - `'compressedHistograms'`, `'scalars'`, etc) to the associated tags for those - data types. Then, various functional endpoints (eg - `Accumulator.Scalars(tag)`) allow for the retrieval of all data - associated with that tag. - - The `Reload()` method synchronously loads all of the data written so far. - - Histograms, audio, and images are very large, so storing all of them is not - recommended. - @@Tensors - """ - - def __init__(self, - path, - size_guidance=DEFAULT_SIZE_GUIDANCE, - compression_bps=NORMAL_HISTOGRAM_BPS, - purge_orphaned_data=True): - """Construct the `EventAccumulator`. - - Args: - path: A file path to a directory containing tf events files, or a single - tf events file. The accumulator will load events from this path. - size_guidance: Information on how much data the EventAccumulator should - store in memory. The DEFAULT_SIZE_GUIDANCE tries not to store too much - so as to avoid OOMing the client. The size_guidance should be a map - from a `tagType` string to an integer representing the number of - items to keep per tag for items of that `tagType`. If the size is 0, - all events are stored. - compression_bps: Information on how the `EventAccumulator` should compress - histogram data for the `CompressedHistograms` tag (for details see - `ProcessCompressedHistogram`). - purge_orphaned_data: Whether to discard any events that were "orphaned" by - a TensorFlow restart. - """ - sizes = {} - for key in DEFAULT_SIZE_GUIDANCE: - if key in size_guidance: - sizes[key] = size_guidance[key] - else: - sizes[key] = DEFAULT_SIZE_GUIDANCE[key] - - self._first_event_timestamp = None - self._scalars = reservoir.Reservoir(size=sizes[SCALARS]) - - # Unlike the other reservoir, the reservoir for health pills is keyed by the - # name of the op instead of the tag. This lets us efficiently obtain the - # health pills per node. - self._health_pills = reservoir.Reservoir(size=sizes[HEALTH_PILLS]) - - self._graph = None - self._graph_from_metagraph = False - self._meta_graph = None - self._tagged_metadata = {} - self._histograms = reservoir.Reservoir(size=sizes[HISTOGRAMS]) - self._compressed_histograms = reservoir.Reservoir( - size=sizes[COMPRESSED_HISTOGRAMS], always_keep_last=False) - self._images = reservoir.Reservoir(size=sizes[IMAGES]) - self._audio = reservoir.Reservoir(size=sizes[AUDIO]) - self._tensors = reservoir.Reservoir(size=sizes[TENSORS]) - - self._generator_mutex = threading.Lock() - self._generator = _GeneratorFromPath(path) - - self._compression_bps = compression_bps - self.purge_orphaned_data = purge_orphaned_data - - self.most_recent_step = -1 - self.most_recent_wall_time = -1 - self.file_version = None - - # The attributes that get built up by the accumulator - self.accumulated_attrs = ('_scalars', '_histograms', - '_compressed_histograms', '_images', '_audio') - self._tensor_summaries = {} - - def Reload(self): - """Loads all events added since the last call to `Reload`. - - If `Reload` was never called, loads all events in the file. - - Returns: - The `EventAccumulator`. - """ - with self._generator_mutex: - for event in self._generator.Load(): - self._ProcessEvent(event) - return self - - def FirstEventTimestamp(self): - """Returns the timestamp in seconds of the first event. - - If the first event has been loaded (either by this method or by `Reload`, - this returns immediately. Otherwise, it will load in the first event. Note - that this means that calling `Reload` will cause this to block until - `Reload` has finished. - - Returns: - The timestamp in seconds of the first event that was loaded. - - Raises: - ValueError: If no events have been loaded and there were no events found - on disk. - """ - if self._first_event_timestamp is not None: - return self._first_event_timestamp - with self._generator_mutex: - try: - event = next(self._generator.Load()) - self._ProcessEvent(event) - return self._first_event_timestamp - - except StopIteration: - raise ValueError('No event timestamp could be found') - - def _ProcessEvent(self, event): - """Called whenever an event is loaded.""" - if self._first_event_timestamp is None: - self._first_event_timestamp = event.wall_time - - if event.HasField('file_version'): - new_file_version = _ParseFileVersion(event.file_version) - if self.file_version and self.file_version != new_file_version: - ## This should not happen. - logging.warn(('Found new file_version for event.proto. This will ' - 'affect purging logic for TensorFlow restarts. ' - 'Old: {0} New: {1}').format(self.file_version, - new_file_version)) - self.file_version = new_file_version - - self._MaybePurgeOrphanedData(event) - - ## Process the event. - # GraphDef and MetaGraphDef are handled in a special way: - # If no graph_def Event is available, but a meta_graph_def is, and it - # contains a graph_def, then use the meta_graph_def.graph_def as our graph. - # If a graph_def Event is available, always prefer it to the graph_def - # inside the meta_graph_def. - if event.HasField('graph_def'): - if self._graph is not None: - logging.warn(('Found more than one graph event per run, or there was ' - 'a metagraph containing a graph_def, as well as one or ' - 'more graph events. Overwriting the graph with the ' - 'newest event.')) - self._graph = event.graph_def - self._graph_from_metagraph = False - elif event.HasField('meta_graph_def'): - if self._meta_graph is not None: - logging.warn(('Found more than one metagraph event per run. ' - 'Overwriting the metagraph with the newest event.')) - self._meta_graph = event.meta_graph_def - if self._graph is None or self._graph_from_metagraph: - # We may have a graph_def in the metagraph. If so, and no - # graph_def is directly available, use this one instead. - meta_graph = meta_graph_pb2.MetaGraphDef() - meta_graph.ParseFromString(self._meta_graph) - if meta_graph.graph_def: - if self._graph is not None: - logging.warn(('Found multiple metagraphs containing graph_defs,' - 'but did not find any graph events. Overwriting the ' - 'graph with the newest metagraph version.')) - self._graph_from_metagraph = True - self._graph = meta_graph.graph_def.SerializeToString() - elif event.HasField('tagged_run_metadata'): - tag = event.tagged_run_metadata.tag - if tag in self._tagged_metadata: - logging.warn('Found more than one "run metadata" event with tag ' + - tag + '. Overwriting it with the newest event.') - self._tagged_metadata[tag] = event.tagged_run_metadata.run_metadata - elif event.HasField('summary'): - for value in event.summary.value: - if value.HasField('tensor') and value.tag == _HEALTH_PILL_EVENT_TAG: - self._ProcessHealthPillSummary(value, event) - else: - for summary_type, summary_func in SUMMARY_TYPES.items(): - if value.HasField(summary_type): - datum = getattr(value, summary_type) - tag = value.node_name if summary_type == 'tensor' else value.tag - getattr(self, summary_func)(tag, event.wall_time, event.step, - datum) - - def _ProcessHealthPillSummary(self, value, event): - """Process summaries containing health pills. - - These summaries are distinguished by the fact that they have a Tensor field - and have a special tag value. - - This method emits ERROR-level messages to the logs if it encounters Tensor - summaries that it cannot process. - - Args: - value: A summary_pb2.Summary.Value with a Tensor field. - event: The event_pb2.Event containing that value. - """ - elements = np.fromstring(value.tensor.tensor_content, dtype=np.float64) - - # The node_name property of the value object is actually a watch key: a - # combination of node name, output slot, and a suffix. We capture the - # actual node name and the output slot with a regular expression. - match = re.match(r'^(.*):(\d+):DebugNumericSummary$', value.node_name) - if not match: - logging.log_first_n( - logging.ERROR, - 'Unsupported watch key %s for health pills; skipping this sequence.', - 1, - value.node_name) - return - - node_name = match.group(1) - output_slot = int(match.group(2)) - self._ProcessHealthPill( - event.wall_time, event.step, node_name, output_slot, elements) - - def Tags(self): - """Return all tags found in the value stream. - - Returns: - A `{tagType: ['list', 'of', 'tags']}` dictionary. - """ - return { - IMAGES: self._images.Keys(), - AUDIO: self._audio.Keys(), - HISTOGRAMS: self._histograms.Keys(), - SCALARS: self._scalars.Keys(), - COMPRESSED_HISTOGRAMS: self._compressed_histograms.Keys(), - TENSORS: self._tensors.Keys(), - # Use a heuristic: if the metagraph is available, but - # graph is not, then we assume the metagraph contains the graph. - GRAPH: self._graph is not None, - META_GRAPH: self._meta_graph is not None, - RUN_METADATA: list(self._tagged_metadata.keys()) - } - - def Scalars(self, tag): - """Given a summary tag, return all associated `ScalarEvent`s. - - Args: - tag: A string tag associated with the events. - - Raises: - KeyError: If the tag is not found. - - Returns: - An array of `ScalarEvent`s. - """ - return self._scalars.Items(tag) - - def HealthPills(self, node_name): - """Returns all health pill values for a certain node. - - Args: - node_name: The name of the node to obtain health pills for. - - Raises: - KeyError: If the node name is not found. - - Returns: - An array of `HealthPillEvent`s. - """ - return self._health_pills.Items(node_name) - - def Graph(self): - """Return the graph definition, if there is one. - - If the graph is stored directly, return that. If no graph is stored - directly but a metagraph is stored containing a graph, return that. - - Raises: - ValueError: If there is no graph for this run. - - Returns: - The `graph_def` proto. - """ - graph = graph_pb2.GraphDef() - if self._graph is not None: - graph.ParseFromString(self._graph) - return graph - raise ValueError('There is no graph in this EventAccumulator') - - def MetaGraph(self): - """Return the metagraph definition, if there is one. - - Raises: - ValueError: If there is no metagraph for this run. - - Returns: - The `meta_graph_def` proto. - """ - if self._meta_graph is None: - raise ValueError('There is no metagraph in this EventAccumulator') - meta_graph = meta_graph_pb2.MetaGraphDef() - meta_graph.ParseFromString(self._meta_graph) - return meta_graph - - def RunMetadata(self, tag): - """Given a tag, return the associated session.run() metadata. - - Args: - tag: A string tag associated with the event. - - Raises: - ValueError: If the tag is not found. - - Returns: - The metadata in form of `RunMetadata` proto. - """ - if tag not in self._tagged_metadata: - raise ValueError('There is no run metadata with this tag name') - - run_metadata = RunMetadata() - run_metadata.ParseFromString(self._tagged_metadata[tag]) - return run_metadata - - def Histograms(self, tag): - """Given a summary tag, return all associated histograms. - - Args: - tag: A string tag associated with the events. - - Raises: - KeyError: If the tag is not found. - - Returns: - An array of `HistogramEvent`s. - """ - return self._histograms.Items(tag) - - def CompressedHistograms(self, tag): - """Given a summary tag, return all associated compressed histograms. - - Args: - tag: A string tag associated with the events. - - Raises: - KeyError: If the tag is not found. - - Returns: - An array of `CompressedHistogramEvent`s. - """ - return self._compressed_histograms.Items(tag) - - def Images(self, tag): - """Given a summary tag, return all associated images. - - Args: - tag: A string tag associated with the events. - - Raises: - KeyError: If the tag is not found. - - Returns: - An array of `ImageEvent`s. - """ - return self._images.Items(tag) - - def Audio(self, tag): - """Given a summary tag, return all associated audio. - - Args: - tag: A string tag associated with the events. - - Raises: - KeyError: If the tag is not found. - - Returns: - An array of `AudioEvent`s. - """ - return self._audio.Items(tag) - - def Tensors(self, tag): - """Given a summary tag, return all associated tensors. - - Args: - tag: A string tag associated with the events. - - Raises: - KeyError: If the tag is not found. - - Returns: - An array of `TensorEvent`s. - """ - return self._tensors.Items(tag) - - def _MaybePurgeOrphanedData(self, event): - """Maybe purge orphaned data due to a TensorFlow crash. - - When TensorFlow crashes at step T+O and restarts at step T, any events - written after step T are now "orphaned" and will be at best misleading if - they are included in TensorBoard. - - This logic attempts to determine if there is orphaned data, and purge it - if it is found. - - Args: - event: The event to use as a reference, to determine if a purge is needed. - """ - if not self.purge_orphaned_data: - return - ## Check if the event happened after a crash, and purge expired tags. - if self.file_version and self.file_version >= 2: - ## If the file_version is recent enough, use the SessionLog enum - ## to check for restarts. - self._CheckForRestartAndMaybePurge(event) - else: - ## If there is no file version, default to old logic of checking for - ## out of order steps. - self._CheckForOutOfOrderStepAndMaybePurge(event) - - def _CheckForRestartAndMaybePurge(self, event): - """Check and discard expired events using SessionLog.START. - - Check for a SessionLog.START event and purge all previously seen events - with larger steps, because they are out of date. Because of supervisor - threading, it is possible that this logic will cause the first few event - messages to be discarded since supervisor threading does not guarantee - that the START message is deterministically written first. - - This method is preferred over _CheckForOutOfOrderStepAndMaybePurge which - can inadvertently discard events due to supervisor threading. - - Args: - event: The event to use as reference. If the event is a START event, all - previously seen events with a greater event.step will be purged. - """ - if event.HasField( - 'session_log') and event.session_log.status == SessionLog.START: - self._Purge(event, by_tags=False) - - def _CheckForOutOfOrderStepAndMaybePurge(self, event): - """Check for out-of-order event.step and discard expired events for tags. - - Check if the event is out of order relative to the global most recent step. - If it is, purge outdated summaries for tags that the event contains. - - Args: - event: The event to use as reference. If the event is out-of-order, all - events with the same tags, but with a greater event.step will be purged. - """ - if event.step < self.most_recent_step and event.HasField('summary'): - self._Purge(event, by_tags=True) - else: - self.most_recent_step = event.step - self.most_recent_wall_time = event.wall_time - - def _ConvertHistogramProtoToTuple(self, histo): - return HistogramValue(min=histo.min, - max=histo.max, - num=histo.num, - sum=histo.sum, - sum_squares=histo.sum_squares, - bucket_limit=list(histo.bucket_limit), - bucket=list(histo.bucket)) - - def _ProcessHistogram(self, tag, wall_time, step, histo): - """Processes a proto histogram by adding it to accumulated state.""" - histo = self._ConvertHistogramProtoToTuple(histo) - histo_ev = HistogramEvent(wall_time, step, histo) - self._histograms.AddItem(tag, histo_ev) - self._compressed_histograms.AddItem( - tag, histo_ev, lambda x: _CompressHistogram(x, self._compression_bps)) - - def _ProcessImage(self, tag, wall_time, step, image): - """Processes an image by adding it to accumulated state.""" - event = ImageEvent(wall_time=wall_time, - step=step, - encoded_image_string=image.encoded_image_string, - width=image.width, - height=image.height) - self._images.AddItem(tag, event) - - def _ProcessAudio(self, tag, wall_time, step, audio): - """Processes a audio by adding it to accumulated state.""" - event = AudioEvent(wall_time=wall_time, - step=step, - encoded_audio_string=audio.encoded_audio_string, - content_type=audio.content_type, - sample_rate=audio.sample_rate, - length_frames=audio.length_frames) - self._audio.AddItem(tag, event) - - def _ProcessScalar(self, tag, wall_time, step, scalar): - """Processes a simple value by adding it to accumulated state.""" - sv = ScalarEvent(wall_time=wall_time, step=step, value=scalar) - self._scalars.AddItem(tag, sv) - - def _ProcessTensor(self, tag, wall_time, step, tensor): - tv = TensorEvent(wall_time=wall_time, step=step, tensor_proto=tensor) - self._tensors.AddItem(tag, tv) - - def _ProcessHealthPill(self, wall_time, step, node_name, output_slot, - elements): - """Processes a health pill value by adding it to accumulated state. - - Args: - wall_time: The time at which the health pill was created. Provided by the - debugger. - step: The step at which the health pill was created. Provided by the - debugger. - node_name: The name of the node for this health pill. - output_slot: The output slot for this health pill. - elements: An ND array of 12 floats. The elements of the health pill. - """ - # Key by the node name for fast retrieval of health pills by node name. The - # array is cast to a list so that it is JSON-able. The debugger data plugin - # serves a JSON response. - self._health_pills.AddItem( - node_name, - HealthPillEvent( - wall_time=wall_time, - step=step, - node_name=node_name, - output_slot=output_slot, - value=list(elements))) - - def _Purge(self, event, by_tags): - """Purge all events that have occurred after the given event.step. - - If by_tags is True, purge all events that occurred after the given - event.step, but only for the tags that the event has. Non-sequential - event.steps suggest that a TensorFlow restart occurred, and we discard - the out-of-order events to display a consistent view in TensorBoard. - - Discarding by tags is the safer method, when we are unsure whether a restart - has occurred, given that threading in supervisor can cause events of - different tags to arrive with unsynchronized step values. - - If by_tags is False, then purge all events with event.step greater than the - given event.step. This can be used when we are certain that a TensorFlow - restart has occurred and these events can be discarded. - - Args: - event: The event to use as reference for the purge. All events with - the same tags, but with a greater event.step will be purged. - by_tags: Bool to dictate whether to discard all out-of-order events or - only those that are associated with the given reference event. - """ - ## Keep data in reservoirs that has a step less than event.step - _NotExpired = lambda x: x.step < event.step - - if by_tags: - - def _ExpiredPerTag(value): - return [getattr(self, x).FilterItems(_NotExpired, value.tag) - for x in self.accumulated_attrs] - - expired_per_tags = [_ExpiredPerTag(value) - for value in event.summary.value] - expired_per_type = [sum(x) for x in zip(*expired_per_tags)] - else: - expired_per_type = [getattr(self, x).FilterItems(_NotExpired) - for x in self.accumulated_attrs] - - if sum(expired_per_type) > 0: - purge_msg = _GetPurgeMessage(self.most_recent_step, - self.most_recent_wall_time, event.step, - event.wall_time, *expired_per_type) - logging.warn(purge_msg) - - -def _GetPurgeMessage(most_recent_step, most_recent_wall_time, event_step, - event_wall_time, num_expired_scalars, num_expired_histos, - num_expired_comp_histos, num_expired_images, - num_expired_audio): - """Return the string message associated with TensorBoard purges.""" - return ('Detected out of order event.step likely caused by ' - 'a TensorFlow restart. Purging expired events from Tensorboard' - ' display between the previous step: {} (timestamp: {}) and ' - 'current step: {} (timestamp: {}). Removing {} scalars, {} ' - 'histograms, {} compressed histograms, {} images, ' - 'and {} audio.').format(most_recent_step, most_recent_wall_time, - event_step, event_wall_time, - num_expired_scalars, num_expired_histos, - num_expired_comp_histos, num_expired_images, - num_expired_audio) - - -def _GeneratorFromPath(path): - """Create an event generator for file or directory at given path string.""" - if not path: - raise ValueError('path must be a valid string') - if IsTensorFlowEventsFile(path): - return event_file_loader.EventFileLoader(path) - else: - return directory_watcher.DirectoryWatcher( - path, event_file_loader.EventFileLoader, IsTensorFlowEventsFile) - - -def _ParseFileVersion(file_version): - """Convert the string file_version in event.proto into a float. - - Args: - file_version: String file_version from event.proto - - Returns: - Version number as a float. - """ - tokens = file_version.split('brain.Event:') - try: - return float(tokens[-1]) - except ValueError: - ## This should never happen according to the definition of file_version - ## specified in event.proto. - logging.warn(('Invalid event.proto file_version. Defaulting to use of ' - 'out-of-order event.step logic for purging expired events.')) - return -1 - - -def _CompressHistogram(histo_ev, bps): - """Creates fixed size histogram by adding compression to accumulated state. - - This routine transforms a histogram at a particular step by linearly - interpolating its variable number of buckets to represent their cumulative - weight at a constant number of compression points. This significantly reduces - the size of the histogram and makes it suitable for a two-dimensional area - plot where the output of this routine constitutes the ranges for a single x - coordinate. - - Args: - histo_ev: A HistogramEvent namedtuple. - bps: Compression points represented in basis points, 1/100ths of a percent. - - Returns: - CompressedHistogramEvent namedtuple. - """ - # See also: Histogram::Percentile() in core/lib/histogram/histogram.cc - histo = histo_ev.histogram_value - if not histo.num: - return CompressedHistogramEvent( - histo_ev.wall_time, - histo_ev.step, - [CompressedHistogramValue(b, 0.0) for b in bps]) - bucket = np.array(histo.bucket) - weights = (bucket * bps[-1] / (bucket.sum() or 1.0)).cumsum() - values = [] - j = 0 - while j < len(bps): - i = np.searchsorted(weights, bps[j], side='right') - while i < len(weights): - cumsum = weights[i] - cumsum_prev = weights[i - 1] if i > 0 else 0.0 - if cumsum == cumsum_prev: # prevent remap divide by zero - i += 1 - continue - if not i or not cumsum_prev: - lhs = histo.min - else: - lhs = max(histo.bucket_limit[i - 1], histo.min) - rhs = min(histo.bucket_limit[i], histo.max) - weight = _Remap(bps[j], cumsum_prev, cumsum, lhs, rhs) - values.append(CompressedHistogramValue(bps[j], weight)) - j += 1 - break - else: - break - while j < len(bps): - values.append(CompressedHistogramValue(bps[j], histo.max)) - j += 1 - return CompressedHistogramEvent(histo_ev.wall_time, histo_ev.step, values) - - -def _Remap(x, x0, x1, y0, y1): - """Linearly map from [x0, x1] unto [y0, y1].""" - return y0 + (x - x0) * float(y1 - y0) / (x1 - x0) diff --git a/tensorflow/python/summary/event_accumulator_test.py b/tensorflow/python/summary/event_accumulator_test.py deleted file mode 100644 index 0577486df5..0000000000 --- a/tensorflow/python/summary/event_accumulator_test.py +++ /dev/null @@ -1,980 +0,0 @@ -# Copyright 2015 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import os - -import numpy as np -import six -from six.moves import xrange # pylint: disable=redefined-builtin - -from tensorflow.core.framework import graph_pb2 -from tensorflow.core.framework import summary_pb2 -from tensorflow.core.protobuf import config_pb2 -from tensorflow.core.util import event_pb2 -from tensorflow.python.framework import constant_op -from tensorflow.python.framework import dtypes -from tensorflow.python.framework import ops -from tensorflow.python.framework import tensor_util -from tensorflow.python.ops import array_ops -from tensorflow.python.platform import gfile -from tensorflow.python.platform import googletest -from tensorflow.python.platform import test -from tensorflow.python.platform import tf_logging as logging -from tensorflow.python.summary import event_accumulator as ea -from tensorflow.python.summary import summary as summary_lib -from tensorflow.python.summary.writer import writer as writer_lib -from tensorflow.python.summary.writer.writer import SummaryToEventTransformer -from tensorflow.python.training import saver - - -class _EventGenerator(object): - """Class that can add_events and then yield them back. - - Satisfies the EventGenerator API required for the EventAccumulator. - Satisfies the EventWriter API required to create a SummaryWriter. - - Has additional convenience methods for adding test events. - """ - - def __init__(self, testcase, zero_out_timestamps=False): - self._testcase = testcase - self.items = [] - self.zero_out_timestamps = zero_out_timestamps - - def Load(self): - while self.items: - yield self.items.pop(0) - - def AddScalar(self, tag, wall_time=0, step=0, value=0): - event = event_pb2.Event( - wall_time=wall_time, - step=step, - summary=summary_pb2.Summary( - value=[summary_pb2.Summary.Value( - tag=tag, simple_value=value)])) - self.AddEvent(event) - - def AddHealthPill(self, wall_time, step, node_name, output_slot, elements): - event = event_pb2.Event() - event.wall_time = wall_time - event.step = step - value = event.summary.value.add() - # The node_name property is actually a watch key. - value.node_name = '%s:%d:DebugNumericSummary' % (node_name, output_slot) - value.tag = '__health_pill__' - value.tensor.tensor_shape.dim.add().size = len(elements) - value.tensor.tensor_content = np.array(elements, dtype=np.float64).tobytes() - self.AddEvent(event) - - def AddHistogram(self, - tag, - wall_time=0, - step=0, - hmin=1, - hmax=2, - hnum=3, - hsum=4, - hsum_squares=5, - hbucket_limit=None, - hbucket=None): - histo = summary_pb2.HistogramProto( - min=hmin, - max=hmax, - num=hnum, - sum=hsum, - sum_squares=hsum_squares, - bucket_limit=hbucket_limit, - bucket=hbucket) - event = event_pb2.Event( - wall_time=wall_time, - step=step, - summary=summary_pb2.Summary( - value=[summary_pb2.Summary.Value( - tag=tag, histo=histo)])) - self.AddEvent(event) - - def AddImage(self, - tag, - wall_time=0, - step=0, - encoded_image_string=b'imgstr', - width=150, - height=100): - image = summary_pb2.Summary.Image( - encoded_image_string=encoded_image_string, width=width, height=height) - event = event_pb2.Event( - wall_time=wall_time, - step=step, - summary=summary_pb2.Summary( - value=[summary_pb2.Summary.Value( - tag=tag, image=image)])) - self.AddEvent(event) - - def AddAudio(self, - tag, - wall_time=0, - step=0, - encoded_audio_string=b'sndstr', - content_type='audio/wav', - sample_rate=44100, - length_frames=22050): - audio = summary_pb2.Summary.Audio( - encoded_audio_string=encoded_audio_string, - content_type=content_type, - sample_rate=sample_rate, - length_frames=length_frames) - event = event_pb2.Event( - wall_time=wall_time, - step=step, - summary=summary_pb2.Summary( - value=[summary_pb2.Summary.Value( - tag=tag, audio=audio)])) - self.AddEvent(event) - - def AddEvent(self, event): - if self.zero_out_timestamps: - event.wall_time = 0 - self.items.append(event) - - def add_event(self, event): # pylint: disable=invalid-name - """Match the EventWriter API.""" - self.AddEvent(event) - - def get_logdir(self): # pylint: disable=invalid-name - """Return a temp directory for asset writing.""" - return self._testcase.get_temp_dir() - - -class EventAccumulatorTest(test.TestCase): - - def assertTagsEqual(self, actual, expected): - """Utility method for checking the return value of the Tags() call. - - It fills out the `expected` arg with the default (empty) values for every - tag type, so that the author needs only specify the non-empty values they - are interested in testing. - - Args: - actual: The actual Accumulator tags response. - expected: The expected tags response (empty fields may be omitted) - """ - - empty_tags = { - ea.IMAGES: [], - ea.AUDIO: [], - ea.SCALARS: [], - ea.HISTOGRAMS: [], - ea.COMPRESSED_HISTOGRAMS: [], - ea.GRAPH: False, - ea.META_GRAPH: False, - ea.RUN_METADATA: [], - ea.TENSORS: [], - } - - # Verifies that there are no unexpected keys in the actual response. - # If this line fails, likely you added a new tag type, and need to update - # the empty_tags dictionary above. - self.assertItemsEqual(actual.keys(), empty_tags.keys()) - - for key in actual: - expected_value = expected.get(key, empty_tags[key]) - if isinstance(expected_value, list): - self.assertItemsEqual(actual[key], expected_value) - else: - self.assertEqual(actual[key], expected_value) - - -class MockingEventAccumulatorTest(EventAccumulatorTest): - - def setUp(self): - super(MockingEventAccumulatorTest, self).setUp() - self.stubs = googletest.StubOutForTesting() - self._real_constructor = ea.EventAccumulator - self._real_generator = ea._GeneratorFromPath - - def _FakeAccumulatorConstructor(generator, *args, **kwargs): - ea._GeneratorFromPath = lambda x: generator - return self._real_constructor(generator, *args, **kwargs) - - ea.EventAccumulator = _FakeAccumulatorConstructor - - def tearDown(self): - self.stubs.CleanUp() - ea.EventAccumulator = self._real_constructor - ea._GeneratorFromPath = self._real_generator - - def testEmptyAccumulator(self): - gen = _EventGenerator(self) - x = ea.EventAccumulator(gen) - x.Reload() - self.assertTagsEqual(x.Tags(), {}) - - def testTags(self): - gen = _EventGenerator(self) - gen.AddScalar('s1') - gen.AddScalar('s2') - gen.AddHistogram('hst1') - gen.AddHistogram('hst2') - gen.AddImage('im1') - gen.AddImage('im2') - gen.AddAudio('snd1') - gen.AddAudio('snd2') - acc = ea.EventAccumulator(gen) - acc.Reload() - self.assertTagsEqual(acc.Tags(), { - ea.IMAGES: ['im1', 'im2'], - ea.AUDIO: ['snd1', 'snd2'], - ea.SCALARS: ['s1', 's2'], - ea.HISTOGRAMS: ['hst1', 'hst2'], - ea.COMPRESSED_HISTOGRAMS: ['hst1', 'hst2'], - }) - - def testReload(self): - gen = _EventGenerator(self) - acc = ea.EventAccumulator(gen) - acc.Reload() - self.assertTagsEqual(acc.Tags(), {}) - gen.AddScalar('s1') - gen.AddScalar('s2') - gen.AddHistogram('hst1') - gen.AddHistogram('hst2') - gen.AddImage('im1') - gen.AddImage('im2') - gen.AddAudio('snd1') - gen.AddAudio('snd2') - acc.Reload() - self.assertTagsEqual(acc.Tags(), { - ea.IMAGES: ['im1', 'im2'], - ea.AUDIO: ['snd1', 'snd2'], - ea.SCALARS: ['s1', 's2'], - ea.HISTOGRAMS: ['hst1', 'hst2'], - ea.COMPRESSED_HISTOGRAMS: ['hst1', 'hst2'], - }) - - def testScalars(self): - gen = _EventGenerator(self) - acc = ea.EventAccumulator(gen) - s1 = ea.ScalarEvent(wall_time=1, step=10, value=32) - s2 = ea.ScalarEvent(wall_time=2, step=12, value=64) - gen.AddScalar('s1', wall_time=1, step=10, value=32) - gen.AddScalar('s2', wall_time=2, step=12, value=64) - acc.Reload() - self.assertEqual(acc.Scalars('s1'), [s1]) - self.assertEqual(acc.Scalars('s2'), [s2]) - - def _compareHealthPills(self, expected_event, gotten_event): - """Compares 2 health pills. - - Args: - expected_event: The expected HealthPillEvent. - gotten_event: The gotten HealthPillEvent. - """ - self.assertEqual(expected_event.wall_time, gotten_event.wall_time) - self.assertEqual(expected_event.step, gotten_event.step) - self.assertEqual(expected_event.node_name, gotten_event.node_name) - self.assertEqual(expected_event.output_slot, gotten_event.output_slot) - self.assertEqual(len(expected_event.value), len(gotten_event.value)) - for i, expected_value in enumerate(expected_event.value): - self.assertEqual(expected_value, gotten_event.value[i]) - - def testHealthPills(self): - gen = _EventGenerator(self) - acc = ea.EventAccumulator(gen) - gen.AddHealthPill(13371337, 41, 'Add', 0, range(1, 13)) - gen.AddHealthPill(13381338, 42, 'Add', 1, range(42, 54)) - - acc = ea.EventAccumulator(gen) - acc.Reload() - - # Retrieve the health pills for each node name. - gotten_events = acc.HealthPills('Add') - self.assertEquals(2, len(gotten_events)) - self._compareHealthPills( - ea.HealthPillEvent( - wall_time=13371337, - step=41, - node_name='Add', - output_slot=0, - value=range(1, 13)), - gotten_events[0]) - self._compareHealthPills( - ea.HealthPillEvent( - wall_time=13381338, - step=42, - node_name='Add', - output_slot=1, - value=range(42, 54)), - gotten_events[1]) - - def testHistograms(self): - gen = _EventGenerator(self) - acc = ea.EventAccumulator(gen) - - val1 = ea.HistogramValue( - min=1, - max=2, - num=3, - sum=4, - sum_squares=5, - bucket_limit=[1, 2, 3], - bucket=[0, 3, 0]) - val2 = ea.HistogramValue( - min=-2, - max=3, - num=4, - sum=5, - sum_squares=6, - bucket_limit=[2, 3, 4], - bucket=[1, 3, 0]) - - hst1 = ea.HistogramEvent(wall_time=1, step=10, histogram_value=val1) - hst2 = ea.HistogramEvent(wall_time=2, step=12, histogram_value=val2) - gen.AddHistogram( - 'hst1', - wall_time=1, - step=10, - hmin=1, - hmax=2, - hnum=3, - hsum=4, - hsum_squares=5, - hbucket_limit=[1, 2, 3], - hbucket=[0, 3, 0]) - gen.AddHistogram( - 'hst2', - wall_time=2, - step=12, - hmin=-2, - hmax=3, - hnum=4, - hsum=5, - hsum_squares=6, - hbucket_limit=[2, 3, 4], - hbucket=[1, 3, 0]) - acc.Reload() - self.assertEqual(acc.Histograms('hst1'), [hst1]) - self.assertEqual(acc.Histograms('hst2'), [hst2]) - - def testCompressedHistograms(self): - gen = _EventGenerator(self) - acc = ea.EventAccumulator(gen, compression_bps=(0, 2500, 5000, 7500, 10000)) - - gen.AddHistogram( - 'hst1', - wall_time=1, - step=10, - hmin=1, - hmax=2, - hnum=3, - hsum=4, - hsum_squares=5, - hbucket_limit=[1, 2, 3], - hbucket=[0, 3, 0]) - gen.AddHistogram( - 'hst2', - wall_time=2, - step=12, - hmin=-2, - hmax=3, - hnum=4, - hsum=5, - hsum_squares=6, - hbucket_limit=[2, 3, 4], - hbucket=[1, 3, 0]) - acc.Reload() - - # Create the expected values after compressing hst1 - expected_vals1 = [ - ea.CompressedHistogramValue(bp, val) - for bp, val in [(0, 1.0), (2500, 1.25), (5000, 1.5), (7500, 1.75 - ), (10000, 2.0)] - ] - expected_cmphst1 = ea.CompressedHistogramEvent( - wall_time=1, step=10, compressed_histogram_values=expected_vals1) - self.assertEqual(acc.CompressedHistograms('hst1'), [expected_cmphst1]) - - # Create the expected values after compressing hst2 - expected_vals2 = [ - ea.CompressedHistogramValue(bp, val) - for bp, val in [(0, -2), - (2500, 2), - (5000, 2 + 1 / 3), - (7500, 2 + 2 / 3), - (10000, 3)] - ] - expected_cmphst2 = ea.CompressedHistogramEvent( - wall_time=2, step=12, compressed_histogram_values=expected_vals2) - self.assertEqual(acc.CompressedHistograms('hst2'), [expected_cmphst2]) - - def testCompressedHistogramsWithEmptyHistogram(self): - gen = _EventGenerator(self) - acc = ea.EventAccumulator(gen, compression_bps=(0, 2500, 5000, 7500, 10000)) - - gen.AddHistogram( - 'hst1', - wall_time=1, - step=10, - hmin=None, - hmax=None, - hnum=0, - hsum=0, - hsum_squares=0, - hbucket_limit=[1, 2, 3], - hbucket=[0, 0, 0]) - acc.Reload() - - # Create the expected values after compressing hst1 - expected_vals1 = [ - ea.CompressedHistogramValue(bp, val) - for bp, val in [(0, 0.0), (2500, 0), (5000, 0), (7500, 0), (10000, 0)] - ] - expected_cmphst1 = ea.CompressedHistogramEvent( - wall_time=1, step=10, compressed_histogram_values=expected_vals1) - self.assertEqual(acc.CompressedHistograms('hst1'), [expected_cmphst1]) - - def testCompressHistogram_uglyHistogram(self): - bps = (0, 668, 1587, 3085, 5000, 6915, 8413, 9332, 10000) - histogram_values = ea.HistogramValue( - min=0.0, - max=1.0, - num=960.0, - sum=64.0, - sum_squares=64.0, - bucket_limit=[ - 0.0, 1e-12, 0.917246389039776, 1.0089710279437536, - 1.7976931348623157e+308 - ], - bucket=[0.0, 896.0, 0.0, 64.0, 0.0]) - histogram_event = ea.HistogramEvent(0, 0, histogram_values) - compressed_event = ea._CompressHistogram(histogram_event, bps) - vals = compressed_event.compressed_histogram_values - self.assertEquals(tuple(v.basis_point for v in vals), bps) - self.assertAlmostEqual(vals[0].value, 0.0) - self.assertAlmostEqual(vals[1].value, 7.157142857142856e-14) - self.assertAlmostEqual(vals[2].value, 1.7003571428571426e-13) - self.assertAlmostEqual(vals[3].value, 3.305357142857143e-13) - self.assertAlmostEqual(vals[4].value, 5.357142857142857e-13) - self.assertAlmostEqual(vals[5].value, 7.408928571428571e-13) - self.assertAlmostEqual(vals[6].value, 9.013928571428571e-13) - self.assertAlmostEqual(vals[7].value, 9.998571428571429e-13) - self.assertAlmostEqual(vals[8].value, 1.0) - - def testImages(self): - gen = _EventGenerator(self) - acc = ea.EventAccumulator(gen) - im1 = ea.ImageEvent( - wall_time=1, - step=10, - encoded_image_string=b'big', - width=400, - height=300) - im2 = ea.ImageEvent( - wall_time=2, - step=12, - encoded_image_string=b'small', - width=40, - height=30) - gen.AddImage( - 'im1', - wall_time=1, - step=10, - encoded_image_string=b'big', - width=400, - height=300) - gen.AddImage( - 'im2', - wall_time=2, - step=12, - encoded_image_string=b'small', - width=40, - height=30) - acc.Reload() - self.assertEqual(acc.Images('im1'), [im1]) - self.assertEqual(acc.Images('im2'), [im2]) - - def testAudio(self): - gen = _EventGenerator(self) - acc = ea.EventAccumulator(gen) - snd1 = ea.AudioEvent( - wall_time=1, - step=10, - encoded_audio_string=b'big', - content_type='audio/wav', - sample_rate=44100, - length_frames=441000) - snd2 = ea.AudioEvent( - wall_time=2, - step=12, - encoded_audio_string=b'small', - content_type='audio/wav', - sample_rate=44100, - length_frames=44100) - gen.AddAudio( - 'snd1', - wall_time=1, - step=10, - encoded_audio_string=b'big', - content_type='audio/wav', - sample_rate=44100, - length_frames=441000) - gen.AddAudio( - 'snd2', - wall_time=2, - step=12, - encoded_audio_string=b'small', - content_type='audio/wav', - sample_rate=44100, - length_frames=44100) - acc.Reload() - self.assertEqual(acc.Audio('snd1'), [snd1]) - self.assertEqual(acc.Audio('snd2'), [snd2]) - - def testKeyError(self): - gen = _EventGenerator(self) - acc = ea.EventAccumulator(gen) - acc.Reload() - with self.assertRaises(KeyError): - acc.Scalars('s1') - with self.assertRaises(KeyError): - acc.Scalars('hst1') - with self.assertRaises(KeyError): - acc.Scalars('im1') - with self.assertRaises(KeyError): - acc.Histograms('s1') - with self.assertRaises(KeyError): - acc.Histograms('im1') - with self.assertRaises(KeyError): - acc.Images('s1') - with self.assertRaises(KeyError): - acc.Images('hst1') - with self.assertRaises(KeyError): - acc.Audio('s1') - with self.assertRaises(KeyError): - acc.Audio('hst1') - - def testNonValueEvents(self): - """Tests that non-value events in the generator don't cause early exits.""" - gen = _EventGenerator(self) - acc = ea.EventAccumulator(gen) - gen.AddScalar('s1', wall_time=1, step=10, value=20) - gen.AddEvent(event_pb2.Event(wall_time=2, step=20, file_version='nots2')) - gen.AddScalar('s3', wall_time=3, step=100, value=1) - gen.AddHistogram('hst1') - gen.AddImage('im1') - gen.AddAudio('snd1') - - acc.Reload() - self.assertTagsEqual(acc.Tags(), { - ea.IMAGES: ['im1'], - ea.AUDIO: ['snd1'], - ea.SCALARS: ['s1', 's3'], - ea.HISTOGRAMS: ['hst1'], - ea.COMPRESSED_HISTOGRAMS: ['hst1'], - }) - - def testExpiredDataDiscardedAfterRestartForFileVersionLessThan2(self): - """Tests that events are discarded after a restart is detected. - - If a step value is observed to be lower than what was previously seen, - this should force a discard of all previous items with the same tag - that are outdated. - - Only file versions < 2 use this out-of-order discard logic. Later versions - discard events based on the step value of SessionLog.START. - """ - warnings = [] - self.stubs.Set(logging, 'warn', warnings.append) - - gen = _EventGenerator(self) - acc = ea.EventAccumulator(gen) - - gen.AddEvent( - event_pb2.Event( - wall_time=0, step=0, file_version='brain.Event:1')) - gen.AddScalar('s1', wall_time=1, step=100, value=20) - gen.AddScalar('s1', wall_time=1, step=200, value=20) - gen.AddScalar('s1', wall_time=1, step=300, value=20) - acc.Reload() - ## Check that number of items are what they should be - self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 200, 300]) - - gen.AddScalar('s1', wall_time=1, step=101, value=20) - gen.AddScalar('s1', wall_time=1, step=201, value=20) - gen.AddScalar('s1', wall_time=1, step=301, value=20) - acc.Reload() - ## Check that we have discarded 200 and 300 from s1 - self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 101, 201, 301]) - - def testOrphanedDataNotDiscardedIfFlagUnset(self): - """Tests that events are not discarded if purge_orphaned_data is false. - """ - gen = _EventGenerator(self) - acc = ea.EventAccumulator(gen, purge_orphaned_data=False) - - gen.AddEvent( - event_pb2.Event( - wall_time=0, step=0, file_version='brain.Event:1')) - gen.AddScalar('s1', wall_time=1, step=100, value=20) - gen.AddScalar('s1', wall_time=1, step=200, value=20) - gen.AddScalar('s1', wall_time=1, step=300, value=20) - acc.Reload() - ## Check that number of items are what they should be - self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 200, 300]) - - gen.AddScalar('s1', wall_time=1, step=101, value=20) - gen.AddScalar('s1', wall_time=1, step=201, value=20) - gen.AddScalar('s1', wall_time=1, step=301, value=20) - acc.Reload() - ## Check that we have discarded 200 and 300 from s1 - self.assertEqual([x.step for x in acc.Scalars('s1')], - [100, 200, 300, 101, 201, 301]) - - def testEventsDiscardedPerTagAfterRestartForFileVersionLessThan2(self): - """Tests that event discards after restart, only affect the misordered tag. - - If a step value is observed to be lower than what was previously seen, - this should force a discard of all previous items that are outdated, but - only for the out of order tag. Other tags should remain unaffected. - - Only file versions < 2 use this out-of-order discard logic. Later versions - discard events based on the step value of SessionLog.START. - """ - warnings = [] - self.stubs.Set(logging, 'warn', warnings.append) - - gen = _EventGenerator(self) - acc = ea.EventAccumulator(gen) - - gen.AddEvent( - event_pb2.Event( - wall_time=0, step=0, file_version='brain.Event:1')) - gen.AddScalar('s1', wall_time=1, step=100, value=20) - gen.AddScalar('s1', wall_time=1, step=200, value=20) - gen.AddScalar('s1', wall_time=1, step=300, value=20) - gen.AddScalar('s1', wall_time=1, step=101, value=20) - gen.AddScalar('s1', wall_time=1, step=201, value=20) - gen.AddScalar('s1', wall_time=1, step=301, value=20) - - gen.AddScalar('s2', wall_time=1, step=101, value=20) - gen.AddScalar('s2', wall_time=1, step=201, value=20) - gen.AddScalar('s2', wall_time=1, step=301, value=20) - - acc.Reload() - ## Check that we have discarded 200 and 300 - self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 101, 201, 301]) - - ## Check that s1 discards do not affect s2 - ## i.e. check that only events from the out of order tag are discarded - self.assertEqual([x.step for x in acc.Scalars('s2')], [101, 201, 301]) - - def testOnlySummaryEventsTriggerDiscards(self): - """Test that file version event does not trigger data purge.""" - gen = _EventGenerator(self) - acc = ea.EventAccumulator(gen) - gen.AddScalar('s1', wall_time=1, step=100, value=20) - ev1 = event_pb2.Event(wall_time=2, step=0, file_version='brain.Event:1') - graph_bytes = graph_pb2.GraphDef().SerializeToString() - ev2 = event_pb2.Event(wall_time=3, step=0, graph_def=graph_bytes) - gen.AddEvent(ev1) - gen.AddEvent(ev2) - acc.Reload() - self.assertEqual([x.step for x in acc.Scalars('s1')], [100]) - - def testSessionLogStartMessageDiscardsExpiredEvents(self): - """Test that SessionLog.START message discards expired events. - - This discard logic is preferred over the out-of-order step discard logic, - but this logic can only be used for event protos which have the SessionLog - enum, which was introduced to event.proto for file_version >= brain.Event:2. - """ - gen = _EventGenerator(self) - acc = ea.EventAccumulator(gen) - gen.AddEvent( - event_pb2.Event( - wall_time=0, step=1, file_version='brain.Event:2')) - - gen.AddScalar('s1', wall_time=1, step=100, value=20) - gen.AddScalar('s1', wall_time=1, step=200, value=20) - gen.AddScalar('s1', wall_time=1, step=300, value=20) - gen.AddScalar('s1', wall_time=1, step=400, value=20) - - gen.AddScalar('s2', wall_time=1, step=202, value=20) - gen.AddScalar('s2', wall_time=1, step=203, value=20) - - slog = event_pb2.SessionLog(status=event_pb2.SessionLog.START) - gen.AddEvent(event_pb2.Event(wall_time=2, step=201, session_log=slog)) - acc.Reload() - self.assertEqual([x.step for x in acc.Scalars('s1')], [100, 200]) - self.assertEqual([x.step for x in acc.Scalars('s2')], []) - - def testFirstEventTimestamp(self): - """Test that FirstEventTimestamp() returns wall_time of the first event.""" - gen = _EventGenerator(self) - acc = ea.EventAccumulator(gen) - gen.AddEvent( - event_pb2.Event( - wall_time=10, step=20, file_version='brain.Event:2')) - gen.AddScalar('s1', wall_time=30, step=40, value=20) - self.assertEqual(acc.FirstEventTimestamp(), 10) - - def testReloadPopulatesFirstEventTimestamp(self): - """Test that Reload() means FirstEventTimestamp() won't load events.""" - gen = _EventGenerator(self) - acc = ea.EventAccumulator(gen) - gen.AddEvent( - event_pb2.Event( - wall_time=1, step=2, file_version='brain.Event:2')) - - acc.Reload() - - def _Die(*args, **kwargs): # pylint: disable=unused-argument - raise RuntimeError('Load() should not be called') - - self.stubs.Set(gen, 'Load', _Die) - self.assertEqual(acc.FirstEventTimestamp(), 1) - - def testFirstEventTimestampLoadsEvent(self): - """Test that FirstEventTimestamp() doesn't discard the loaded event.""" - gen = _EventGenerator(self) - acc = ea.EventAccumulator(gen) - gen.AddEvent( - event_pb2.Event( - wall_time=1, step=2, file_version='brain.Event:2')) - - self.assertEqual(acc.FirstEventTimestamp(), 1) - acc.Reload() - self.assertEqual(acc.file_version, 2.0) - - def testTFSummaryScalar(self): - """Verify processing of tf.summary.scalar.""" - event_sink = _EventGenerator(self, zero_out_timestamps=True) - writer = SummaryToEventTransformer(event_sink) - with self.test_session() as sess: - ipt = array_ops.placeholder(dtypes.float32) - summary_lib.scalar('scalar1', ipt) - summary_lib.scalar('scalar2', ipt * ipt) - merged = summary_lib.merge_all() - writer.add_graph(sess.graph) - for i in xrange(10): - summ = sess.run(merged, feed_dict={ipt: i}) - writer.add_summary(summ, global_step=i) - - accumulator = ea.EventAccumulator(event_sink) - accumulator.Reload() - - seq1 = [ea.ScalarEvent(wall_time=0, step=i, value=i) for i in xrange(10)] - seq2 = [ - ea.ScalarEvent( - wall_time=0, step=i, value=i * i) for i in xrange(10) - ] - - self.assertTagsEqual(accumulator.Tags(), { - ea.SCALARS: ['scalar1', 'scalar2'], - ea.GRAPH: True, - ea.META_GRAPH: False, - }) - - self.assertEqual(accumulator.Scalars('scalar1'), seq1) - self.assertEqual(accumulator.Scalars('scalar2'), seq2) - first_value = accumulator.Scalars('scalar1')[0].value - self.assertTrue(isinstance(first_value, float)) - - def testTFSummaryImage(self): - """Verify processing of tf.summary.image.""" - event_sink = _EventGenerator(self, zero_out_timestamps=True) - writer = SummaryToEventTransformer(event_sink) - with self.test_session() as sess: - ipt = array_ops.ones([10, 4, 4, 3], dtypes.uint8) - # This is an interesting example, because the old tf.image_summary op - # would throw an error here, because it would be tag reuse. - # Using the tf node name instead allows argument re-use to the image - # summary. - with ops.name_scope('1'): - summary_lib.image('images', ipt, max_outputs=1) - with ops.name_scope('2'): - summary_lib.image('images', ipt, max_outputs=2) - with ops.name_scope('3'): - summary_lib.image('images', ipt, max_outputs=3) - merged = summary_lib.merge_all() - writer.add_graph(sess.graph) - for i in xrange(10): - summ = sess.run(merged) - writer.add_summary(summ, global_step=i) - - accumulator = ea.EventAccumulator(event_sink) - accumulator.Reload() - - tags = [ - u'1/images/image', u'2/images/image/0', u'2/images/image/1', - u'3/images/image/0', u'3/images/image/1', u'3/images/image/2' - ] - - self.assertTagsEqual(accumulator.Tags(), { - ea.IMAGES: tags, - ea.GRAPH: True, - ea.META_GRAPH: False, - }) - - def testTFSummaryTensor(self): - """Verify processing of tf.summary.tensor.""" - event_sink = _EventGenerator(self, zero_out_timestamps=True) - writer = SummaryToEventTransformer(event_sink) - with self.test_session() as sess: - summary_lib.tensor_summary('scalar', constant_op.constant(1.0)) - summary_lib.tensor_summary('vector', constant_op.constant( - [1.0, 2.0, 3.0])) - summary_lib.tensor_summary('string', - constant_op.constant(six.b('foobar'))) - merged = summary_lib.merge_all() - summ = sess.run(merged) - writer.add_summary(summ, 0) - - accumulator = ea.EventAccumulator(event_sink) - accumulator.Reload() - - self.assertTagsEqual(accumulator.Tags(), { - ea.TENSORS: ['scalar', 'vector', 'string'], - }) - - scalar_proto = accumulator.Tensors('scalar')[0].tensor_proto - scalar = tensor_util.MakeNdarray(scalar_proto) - vector_proto = accumulator.Tensors('vector')[0].tensor_proto - vector = tensor_util.MakeNdarray(vector_proto) - string_proto = accumulator.Tensors('string')[0].tensor_proto - string = tensor_util.MakeNdarray(string_proto) - - self.assertTrue(np.array_equal(scalar, 1.0)) - self.assertTrue(np.array_equal(vector, [1.0, 2.0, 3.0])) - self.assertTrue(np.array_equal(string, six.b('foobar'))) - - -class RealisticEventAccumulatorTest(EventAccumulatorTest): - - def setUp(self): - super(RealisticEventAccumulatorTest, self).setUp() - - def testScalarsRealistically(self): - """Test accumulator by writing values and then reading them.""" - - def FakeScalarSummary(tag, value): - value = summary_pb2.Summary.Value(tag=tag, simple_value=value) - summary = summary_pb2.Summary(value=[value]) - return summary - - directory = os.path.join(self.get_temp_dir(), 'values_dir') - if gfile.IsDirectory(directory): - gfile.DeleteRecursively(directory) - gfile.MkDir(directory) - - writer = writer_lib.FileWriter(directory, max_queue=100) - - with ops.Graph().as_default() as graph: - _ = constant_op.constant([2.0, 1.0]) - # Add a graph to the summary writer. - writer.add_graph(graph) - meta_graph_def = saver.export_meta_graph( - graph_def=graph.as_graph_def(add_shapes=True)) - writer.add_meta_graph(meta_graph_def) - - run_metadata = config_pb2.RunMetadata() - device_stats = run_metadata.step_stats.dev_stats.add() - device_stats.device = 'test device' - writer.add_run_metadata(run_metadata, 'test run') - - # Write a bunch of events using the writer. - for i in xrange(30): - summ_id = FakeScalarSummary('id', i) - summ_sq = FakeScalarSummary('sq', i * i) - writer.add_summary(summ_id, i * 5) - writer.add_summary(summ_sq, i * 5) - writer.flush() - - # Verify that we can load those events properly - acc = ea.EventAccumulator(directory) - acc.Reload() - self.assertTagsEqual(acc.Tags(), { - ea.SCALARS: ['id', 'sq'], - ea.GRAPH: True, - ea.META_GRAPH: True, - ea.RUN_METADATA: ['test run'], - }) - id_events = acc.Scalars('id') - sq_events = acc.Scalars('sq') - self.assertEqual(30, len(id_events)) - self.assertEqual(30, len(sq_events)) - for i in xrange(30): - self.assertEqual(i * 5, id_events[i].step) - self.assertEqual(i * 5, sq_events[i].step) - self.assertEqual(i, id_events[i].value) - self.assertEqual(i * i, sq_events[i].value) - - # Write a few more events to test incremental reloading - for i in xrange(30, 40): - summ_id = FakeScalarSummary('id', i) - summ_sq = FakeScalarSummary('sq', i * i) - writer.add_summary(summ_id, i * 5) - writer.add_summary(summ_sq, i * 5) - writer.flush() - - # Verify we can now see all of the data - acc.Reload() - id_events = acc.Scalars('id') - sq_events = acc.Scalars('sq') - self.assertEqual(40, len(id_events)) - self.assertEqual(40, len(sq_events)) - for i in xrange(40): - self.assertEqual(i * 5, id_events[i].step) - self.assertEqual(i * 5, sq_events[i].step) - self.assertEqual(i, id_events[i].value) - self.assertEqual(i * i, sq_events[i].value) - self.assertProtoEquals(graph.as_graph_def(add_shapes=True), acc.Graph()) - self.assertProtoEquals(meta_graph_def, acc.MetaGraph()) - - def testGraphFromMetaGraphBecomesAvailable(self): - """Test accumulator by writing values and then reading them.""" - - directory = os.path.join(self.get_temp_dir(), 'metagraph_test_values_dir') - if gfile.IsDirectory(directory): - gfile.DeleteRecursively(directory) - gfile.MkDir(directory) - - writer = writer_lib.FileWriter(directory, max_queue=100) - - with ops.Graph().as_default() as graph: - _ = constant_op.constant([2.0, 1.0]) - # Add a graph to the summary writer. - meta_graph_def = saver.export_meta_graph( - graph_def=graph.as_graph_def(add_shapes=True)) - writer.add_meta_graph(meta_graph_def) - - writer.flush() - - # Verify that we can load those events properly - acc = ea.EventAccumulator(directory) - acc.Reload() - self.assertTagsEqual(acc.Tags(), { - ea.GRAPH: True, - ea.META_GRAPH: True, - }) - self.assertProtoEquals(graph.as_graph_def(add_shapes=True), acc.Graph()) - self.assertProtoEquals(meta_graph_def, acc.MetaGraph()) - - -if __name__ == '__main__': - test.main() diff --git a/tensorflow/python/summary/event_file_inspector.py b/tensorflow/python/summary/event_file_inspector.py deleted file mode 100644 index 3f56bb58e1..0000000000 --- a/tensorflow/python/summary/event_file_inspector.py +++ /dev/null @@ -1,427 +0,0 @@ -# Copyright 2015 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== - -"""Logic for TensorBoard inspector to help humans investigate event files. - -Example usages: -tensorboard --inspect --event_file=myevents.out -tensorboard --inspect --event_file=myevents.out --tag=loss -tensorboard --inspect --logdir=mylogdir -tensorboard --inspect --logdir=mylogdir --tag=loss - - -This script runs over a logdir and creates an InspectionUnit for every -subdirectory with event files. If running over an event file, it creates only -one InspectionUnit. One block of output is printed to console for each -InspectionUnit. - -The primary content of an InspectionUnit is the dict field_to_obs that maps -fields (e.g. "scalar", "histogram", "session_log:start", etc.) to a list of -Observations for the field. Observations correspond one-to-one with Events in an -event file but contain less information because they only store what is -necessary to generate the final console output. - -The final output is rendered to console by applying some aggregating function -to the lists of Observations. Different functions are applied depending on the -type of field. For instance, for "scalar" fields, the inspector shows aggregate -statistics. For other fields like "session_log:start", all observed steps are -printed in order to aid debugging. - - -[1] Query a logdir or an event file for its logged tags and summary statistics -using --logdir or --event_file. - -[[event_file]] contains these tags: -histograms - binary/Sign/Activations - binary/nn_tanh/act/Activations - binary/nn_tanh/biases - binary/nn_tanh/biases:gradient - binary/nn_tanh/weights - binary/nn_tanh/weights:gradient -images - input_images/image/0 - input_images/image/1 - input_images/image/2 -scalars - Learning Rate - Total Cost - Total Cost (raw) - -Debug output aggregated over all tags: -graph - first_step 0 - last_step 0 - max_step 0 - min_step 0 - num_steps 1 - outoforder_steps [] -histograms - first_step 491 - last_step 659823 - max_step 659823 - min_step 491 - num_steps 993 - outoforder_steps [] -images - -scalars - first_step 0 - last_step 659823 - max_step 659823 - min_step 0 - num_steps 1985 - outoforder_steps [] -sessionlog:checkpoint - first_step 7129 - last_step 657167 - max_step 657167 - min_step 7129 - num_steps 99 - outoforder_steps [] -sessionlog:start - outoforder_steps [] - steps [0L] -sessionlog:stop - - - -[2] Drill down into a particular tag using --tag. - -Debug output for binary/Sign/Activations: -histograms - first_step 491 - last_step 659823 - max_step 659823 - min_step 491 - num_steps 993 - outoforder_steps [] -""" - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import collections -import itertools -import os - -from tensorflow.core.util.event_pb2 import SessionLog -from tensorflow.python.platform import app -from tensorflow.python.platform import flags -from tensorflow.python.platform import gfile -from tensorflow.python.summary import event_accumulator -from tensorflow.python.summary import event_multiplexer -from tensorflow.python.summary.impl import event_file_loader - -FLAGS = flags.FLAGS - - -# Map of field names within summary.proto to the user-facing names that this -# script outputs. -SUMMARY_TYPE_TO_FIELD = {'simple_value': 'scalars', - 'histo': 'histograms', - 'image': 'images', - 'audio': 'audio'} -for summary_type in event_accumulator.SUMMARY_TYPES: - if summary_type not in SUMMARY_TYPE_TO_FIELD: - SUMMARY_TYPE_TO_FIELD[summary_type] = summary_type - -# Types of summaries that we may want to query for by tag. -TAG_FIELDS = list(SUMMARY_TYPE_TO_FIELD.values()) - -# Summaries that we want to see every instance of. -LONG_FIELDS = ['sessionlog:start', 'sessionlog:stop'] - -# Summaries that we only want an abridged digest of, since they would -# take too much screen real estate otherwise. -SHORT_FIELDS = ['graph', 'sessionlog:checkpoint'] + TAG_FIELDS - -# All summary types that we can inspect. -TRACKED_FIELDS = SHORT_FIELDS + LONG_FIELDS - -# An `Observation` contains the data within each Event file that the inspector -# cares about. The inspector accumulates Observations as it processes events. -Observation = collections.namedtuple('Observation', ['step', 'wall_time', - 'tag']) - -# An InspectionUnit is created for each organizational structure in the event -# files visible in the final terminal output. For instance, one InspectionUnit -# is created for each subdirectory in logdir. When asked to inspect a single -# event file, there may only be one InspectionUnit. - -# The InspectionUnit contains the `name` of the organizational unit that will be -# printed to console, a `generator` that yields `Event` protos, and a mapping -# from string fields to `Observations` that the inspector creates. -InspectionUnit = collections.namedtuple('InspectionUnit', ['name', 'generator', - 'field_to_obs']) - -PRINT_SEPARATOR = '=' * 70 + '\n' - - -def get_field_to_observations_map(generator, query_for_tag=''): - """Return a field to `Observations` dict for the event generator. - - Args: - generator: A generator over event protos. - query_for_tag: A string that if specified, only create observations for - events with this tag name. - - Returns: - A dict mapping keys in `TRACKED_FIELDS` to an `Observation` list. - """ - - def increment(stat, event, tag=''): - assert stat in TRACKED_FIELDS - field_to_obs[stat].append(Observation(step=event.step, - wall_time=event.wall_time, - tag=tag)._asdict()) - - field_to_obs = dict([(t, []) for t in TRACKED_FIELDS]) - - for event in generator: - ## Process the event - if event.HasField('graph_def') and (not query_for_tag): - increment('graph', event) - if event.HasField('session_log') and (not query_for_tag): - status = event.session_log.status - if status == SessionLog.START: - increment('sessionlog:start', event) - elif status == SessionLog.STOP: - increment('sessionlog:stop', event) - elif status == SessionLog.CHECKPOINT: - increment('sessionlog:checkpoint', event) - elif event.HasField('summary'): - for value in event.summary.value: - if query_for_tag and value.tag != query_for_tag: - continue - - for proto_name, display_name in SUMMARY_TYPE_TO_FIELD.items(): - if value.HasField(proto_name): - increment(display_name, event, value.tag) - return field_to_obs - - -def get_unique_tags(field_to_obs): - """Returns a dictionary of tags that a user could query over. - - Args: - field_to_obs: Dict that maps string field to `Observation` list. - - Returns: - A dict that maps keys in `TAG_FIELDS` to a list of string tags present in - the event files. If the dict does not have any observations of the type, - maps to an empty list so that we can render this to console. - """ - return {field: sorted(set([x.get('tag', '') for x in observations])) - for field, observations in field_to_obs.items() - if field in TAG_FIELDS} - - -def print_dict(d, show_missing=True): - """Prints a shallow dict to console. - - Args: - d: Dict to print. - show_missing: Whether to show keys with empty values. - """ - for k, v in sorted(d.items()): - if (not v) and show_missing: - # No instances of the key, so print missing symbol. - print('{} -'.format(k)) - elif isinstance(v, list): - # Value is a list, so print each item of the list. - print(k) - for item in v: - print(' {}'.format(item)) - elif isinstance(v, dict): - # Value is a dict, so print each (key, value) pair of the dict. - print(k) - for kk, vv in sorted(v.items()): - print(' {:<20} {}'.format(kk, vv)) - - -def get_dict_to_print(field_to_obs): - """Transform the field-to-obs mapping into a printable dictionary. - - Args: - field_to_obs: Dict that maps string field to `Observation` list. - - Returns: - A dict with the keys and values to print to console. - """ - - def compressed_steps(steps): - return {'num_steps': len(set(steps)), - 'min_step': min(steps), - 'max_step': max(steps), - 'last_step': steps[-1], - 'first_step': steps[0], - 'outoforder_steps': get_out_of_order(steps)} - - def full_steps(steps): - return {'steps': steps, 'outoforder_steps': get_out_of_order(steps)} - - output = {} - for field, observations in field_to_obs.items(): - if not observations: - output[field] = None - continue - - steps = [x['step'] for x in observations] - if field in SHORT_FIELDS: - output[field] = compressed_steps(steps) - if field in LONG_FIELDS: - output[field] = full_steps(steps) - - return output - - -def get_out_of_order(list_of_numbers): - """Returns elements that break the monotonically non-decreasing trend. - - This is used to find instances of global step values that are "out-of-order", - which may trigger TensorBoard event discarding logic. - - Args: - list_of_numbers: A list of numbers. - - Returns: - A list of tuples in which each tuple are two elements are adjacent, but the - second element is lower than the first. - """ - # TODO(cassandrax): Consider changing this to only check for out-of-order - # steps within a particular tag. - result = [] - for i in range(len(list_of_numbers)): - if i == 0: - continue - if list_of_numbers[i] < list_of_numbers[i - 1]: - result.append((list_of_numbers[i - 1], list_of_numbers[i])) - return result - - -def generators_from_logdir(logdir): - """Returns a list of event generators for subdirectories with event files. - - The number of generators returned should equal the number of directories - within logdir that contain event files. If only logdir contains event files, - returns a list of length one. - - Args: - logdir: A log directory that contains event files. - - Returns: - List of event generators for each subdirectory with event files. - """ - subdirs = event_multiplexer.GetLogdirSubdirectories(logdir) - generators = [itertools.chain(*[ - generator_from_event_file(os.path.join(subdir, f)) - for f in gfile.ListDirectory(subdir) - if event_accumulator.IsTensorFlowEventsFile(os.path.join(subdir, f)) - ]) for subdir in subdirs] - return generators - - -def generator_from_event_file(event_file): - """Returns a generator that yields events from an event file.""" - return event_file_loader.EventFileLoader(event_file).Load() - - -def get_inspection_units(logdir='', event_file='', tag=''): - """Returns a list of InspectionUnit objects given either logdir or event_file. - - If logdir is given, the number of InspectionUnits should equal the - number of directories or subdirectories that contain event files. - - If event_file is given, the number of InspectionUnits should be 1. - - Args: - logdir: A log directory that contains event files. - event_file: Or, a particular event file path. - tag: An optional tag name to query for. - - Returns: - A list of InspectionUnit objects. - """ - if logdir: - subdirs = event_multiplexer.GetLogdirSubdirectories(logdir) - inspection_units = [] - for subdir in subdirs: - generator = itertools.chain(*[ - generator_from_event_file(os.path.join(subdir, f)) - for f in gfile.ListDirectory(subdir) - if event_accumulator.IsTensorFlowEventsFile(os.path.join(subdir, f)) - ]) - inspection_units.append(InspectionUnit( - name=subdir, - generator=generator, - field_to_obs=get_field_to_observations_map(generator, tag))) - if inspection_units: - print('Found event files in:\n{}\n'.format('\n'.join( - [u.name for u in inspection_units]))) - elif event_accumulator.IsTensorFlowEventsFile(logdir): - print( - 'It seems that {} may be an event file instead of a logdir. If this ' - 'is the case, use --event_file instead of --logdir to pass ' - 'it in.'.format(logdir)) - else: - print('No event files found within logdir {}'.format(logdir)) - return inspection_units - elif event_file: - generator = generator_from_event_file(event_file) - return [InspectionUnit( - name=event_file, - generator=generator, - field_to_obs=get_field_to_observations_map(generator, tag))] - - -def inspect(logdir='', event_file='', tag=''): - """Main function for inspector that prints out a digest of event files. - - Args: - logdir: A log directory that contains event files. - event_file: Or, a particular event file path. - tag: An optional tag name to query for. - - Raises: - ValueError: If neither logdir and event_file are given, or both are given. - """ - if logdir and event_file: - raise ValueError( - 'Must specify either --logdir or --event_file, but not both.') - if not (logdir or event_file): - raise ValueError('Must specify either --logdir or --event_file.') - - print(PRINT_SEPARATOR + - 'Processing event files... (this can take a few minutes)\n' + - PRINT_SEPARATOR) - inspection_units = get_inspection_units(logdir, event_file, tag) - - for unit in inspection_units: - if tag: - print('Event statistics for tag {} in {}:'.format(tag, unit.name)) - else: - # If the user is not inspecting a particular tag, also print the list of - # all available tags that they can query. - print('These tags are in {}:'.format(unit.name)) - print_dict(get_unique_tags(unit.field_to_obs)) - print(PRINT_SEPARATOR) - print('Event statistics for {}:'.format(unit.name)) - - print_dict(get_dict_to_print(unit.field_to_obs), show_missing=(not tag)) - print(PRINT_SEPARATOR) - - -if __name__ == '__main__': - app.run() diff --git a/tensorflow/python/summary/event_file_inspector_test.py b/tensorflow/python/summary/event_file_inspector_test.py deleted file mode 100644 index 940586cf97..0000000000 --- a/tensorflow/python/summary/event_file_inspector_test.py +++ /dev/null @@ -1,172 +0,0 @@ -# Copyright 2015 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import os -import shutil - -from tensorflow.core.framework.summary_pb2 import HistogramProto -from tensorflow.core.framework.summary_pb2 import Summary -from tensorflow.core.util.event_pb2 import SessionLog -from tensorflow.python.framework import test_util -from tensorflow.python.platform import googletest -from tensorflow.python.summary import event_file_inspector as efi -from tensorflow.python.training.summary_io import SummaryWriter - - -class EventFileInspectorTest(test_util.TensorFlowTestCase): - - def setUp(self): - self.logdir = os.path.join(self.get_temp_dir(), 'tfevents') - self._MakeDirectoryIfNotExists(self.logdir) - - def tearDown(self): - shutil.rmtree(self.logdir) - - def _MakeDirectoryIfNotExists(self, path): - if not os.path.exists(path): - os.mkdir(path) - - def _WriteScalarSummaries(self, data, subdirs=('',)): - # Writes data to a tempfile in subdirs, and returns generator for the data. - # If subdirs is given, writes data identically to all subdirectories. - for subdir_ in subdirs: - subdir = os.path.join(self.logdir, subdir_) - self._MakeDirectoryIfNotExists(subdir) - - sw = SummaryWriter(subdir) - for datum in data: - summary = Summary() - if 'simple_value' in datum: - summary.value.add(tag=datum['tag'], - simple_value=datum['simple_value']) - sw.add_summary(summary, global_step=datum['step']) - elif 'histo' in datum: - summary.value.add(tag=datum['tag'], histo=HistogramProto()) - sw.add_summary(summary, global_step=datum['step']) - elif 'session_log' in datum: - sw.add_session_log(datum['session_log'], global_step=datum['step']) - sw.close() - - def testEmptyLogdir(self): - # Nothing was written to logdir - units = efi.get_inspection_units(self.logdir) - self.assertEqual([], units) - - def testGetAvailableTags(self): - data = [{'tag': 'c', 'histo': 2, 'step': 10}, - {'tag': 'c', 'histo': 2, 'step': 11}, - {'tag': 'c', 'histo': 2, 'step': 9}, - {'tag': 'b', 'simple_value': 2, 'step': 20}, - {'tag': 'b', 'simple_value': 2, 'step': 15}, - {'tag': 'a', 'simple_value': 2, 'step': 3}] - self._WriteScalarSummaries(data) - units = efi.get_inspection_units(self.logdir) - tags = efi.get_unique_tags(units[0].field_to_obs) - self.assertEqual(['a', 'b'], tags['scalars']) - self.assertEqual(['c'], tags['histograms']) - - def testInspectAll(self): - data = [{'tag': 'c', 'histo': 2, 'step': 10}, - {'tag': 'c', 'histo': 2, 'step': 11}, - {'tag': 'c', 'histo': 2, 'step': 9}, - {'tag': 'b', 'simple_value': 2, 'step': 20}, - {'tag': 'b', 'simple_value': 2, 'step': 15}, - {'tag': 'a', 'simple_value': 2, 'step': 3}] - self._WriteScalarSummaries(data) - units = efi.get_inspection_units(self.logdir) - printable = efi.get_dict_to_print(units[0].field_to_obs) - self.assertEqual(printable['histograms']['max_step'], 11) - self.assertEqual(printable['histograms']['min_step'], 9) - self.assertEqual(printable['histograms']['num_steps'], 3) - self.assertEqual(printable['histograms']['last_step'], 9) - self.assertEqual(printable['histograms']['first_step'], 10) - self.assertEqual(printable['histograms']['outoforder_steps'], [(11, 9)]) - - self.assertEqual(printable['scalars']['max_step'], 20) - self.assertEqual(printable['scalars']['min_step'], 3) - self.assertEqual(printable['scalars']['num_steps'], 3) - self.assertEqual(printable['scalars']['last_step'], 3) - self.assertEqual(printable['scalars']['first_step'], 20) - self.assertEqual(printable['scalars']['outoforder_steps'], [(20, 15), - (15, 3)]) - - def testInspectTag(self): - data = [{'tag': 'c', 'histo': 2, 'step': 10}, - {'tag': 'c', 'histo': 2, 'step': 11}, - {'tag': 'c', 'histo': 2, 'step': 9}, - {'tag': 'b', 'histo': 2, 'step': 20}, - {'tag': 'b', 'simple_value': 2, 'step': 15}, - {'tag': 'a', 'simple_value': 2, 'step': 3}] - self._WriteScalarSummaries(data) - units = efi.get_inspection_units(self.logdir, tag='c') - printable = efi.get_dict_to_print(units[0].field_to_obs) - self.assertEqual(printable['histograms']['max_step'], 11) - self.assertEqual(printable['histograms']['min_step'], 9) - self.assertEqual(printable['histograms']['num_steps'], 3) - self.assertEqual(printable['histograms']['last_step'], 9) - self.assertEqual(printable['histograms']['first_step'], 10) - self.assertEqual(printable['histograms']['outoforder_steps'], [(11, 9)]) - self.assertEqual(printable['scalars'], None) - - def testSessionLogSummaries(self): - data = [ - {'session_log': SessionLog(status=SessionLog.START), 'step': 0}, - {'session_log': SessionLog(status=SessionLog.CHECKPOINT), 'step': 1}, - {'session_log': SessionLog(status=SessionLog.CHECKPOINT), 'step': 2}, - {'session_log': SessionLog(status=SessionLog.CHECKPOINT), 'step': 3}, - {'session_log': SessionLog(status=SessionLog.STOP), 'step': 4}, - {'session_log': SessionLog(status=SessionLog.START), 'step': 5}, - {'session_log': SessionLog(status=SessionLog.STOP), 'step': 6}, - ] - - self._WriteScalarSummaries(data) - units = efi.get_inspection_units(self.logdir) - self.assertEqual(1, len(units)) - printable = efi.get_dict_to_print(units[0].field_to_obs) - self.assertEqual(printable['sessionlog:start']['steps'], [0, 5]) - self.assertEqual(printable['sessionlog:stop']['steps'], [4, 6]) - self.assertEqual(printable['sessionlog:checkpoint']['num_steps'], 3) - - def testInspectAllWithNestedLogdirs(self): - data = [{'tag': 'c', 'simple_value': 2, 'step': 10}, - {'tag': 'c', 'simple_value': 2, 'step': 11}, - {'tag': 'c', 'simple_value': 2, 'step': 9}, - {'tag': 'b', 'simple_value': 2, 'step': 20}, - {'tag': 'b', 'simple_value': 2, 'step': 15}, - {'tag': 'a', 'simple_value': 2, 'step': 3}] - - subdirs = ['eval', 'train'] - self._WriteScalarSummaries(data, subdirs=subdirs) - units = efi.get_inspection_units(self.logdir) - self.assertEqual(2, len(units)) - directory_names = [os.path.join(self.logdir, name) for name in subdirs] - self.assertEqual(directory_names, sorted([unit.name for unit in units])) - - for unit in units: - printable = efi.get_dict_to_print(unit.field_to_obs)['scalars'] - self.assertEqual(printable['max_step'], 20) - self.assertEqual(printable['min_step'], 3) - self.assertEqual(printable['num_steps'], 6) - self.assertEqual(printable['last_step'], 3) - self.assertEqual(printable['first_step'], 10) - self.assertEqual(printable['outoforder_steps'], [(11, 9), (20, 15), - (15, 3)]) - -if __name__ == '__main__': - googletest.main() diff --git a/tensorflow/python/summary/event_multiplexer.py b/tensorflow/python/summary/event_multiplexer.py deleted file mode 100644 index 18176e10fe..0000000000 --- a/tensorflow/python/summary/event_multiplexer.py +++ /dev/null @@ -1,428 +0,0 @@ -# Copyright 2015 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -"""Provides an interface for working with multiple event files.""" - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import os -import threading - -import six - -from tensorflow.python.platform import gfile -from tensorflow.python.platform import tf_logging as logging -from tensorflow.python.summary import event_accumulator -from tensorflow.python.summary.impl import directory_watcher -from tensorflow.python.summary.impl import io_wrapper - - -class EventMultiplexer(object): - """An `EventMultiplexer` manages access to multiple `EventAccumulator`s. - - Each `EventAccumulator` is associated with a `run`, which is a self-contained - TensorFlow execution. The `EventMultiplexer` provides methods for extracting - information about events from multiple `run`s. - - Example usage for loading specific runs from files: - - ```python - x = EventMultiplexer({'run1': 'path/to/run1', 'run2': 'path/to/run2'}) - x.Reload() - ``` - - Example usage for loading a directory where each subdirectory is a run - - ```python - (eg:) /parent/directory/path/ - /parent/directory/path/run1/ - /parent/directory/path/run1/events.out.tfevents.1001 - /parent/directory/path/run1/events.out.tfevents.1002 - - /parent/directory/path/run2/ - /parent/directory/path/run2/events.out.tfevents.9232 - - /parent/directory/path/run3/ - /parent/directory/path/run3/events.out.tfevents.9232 - x = EventMultiplexer().AddRunsFromDirectory('/parent/directory/path') - (which is equivalent to:) - x = EventMultiplexer({'run1': '/parent/directory/path/run1', 'run2':...} - ``` - - If you would like to watch `/parent/directory/path`, wait for it to be created - (if necessary) and then periodically pick up new runs, use - `AutoloadingMultiplexer` - @@Tensors - """ - - def __init__(self, - run_path_map=None, - size_guidance=event_accumulator.DEFAULT_SIZE_GUIDANCE, - purge_orphaned_data=True): - """Constructor for the `EventMultiplexer`. - - Args: - run_path_map: Dict `{run: path}` which specifies the - name of a run, and the path to find the associated events. If it is - None, then the EventMultiplexer initializes without any runs. - size_guidance: A dictionary mapping from `tagType` to the number of items - to store for each tag of that type. See - `event_accumulator.EventAccumulator` for details. - purge_orphaned_data: Whether to discard any events that were "orphaned" by - a TensorFlow restart. - """ - logging.info('Event Multiplexer initializing.') - self._accumulators_mutex = threading.Lock() - self._accumulators = {} - self._paths = {} - self._reload_called = False - self._size_guidance = size_guidance - self.purge_orphaned_data = purge_orphaned_data - if run_path_map is not None: - logging.info('Event Multplexer doing initialization load for %s', - run_path_map) - for (run, path) in six.iteritems(run_path_map): - self.AddRun(path, run) - logging.info('Event Multiplexer done initializing') - - def AddRun(self, path, name=None): - """Add a run to the multiplexer. - - If the name is not specified, it is the same as the path. - - If a run by that name exists, and we are already watching the right path, - do nothing. If we are watching a different path, replace the event - accumulator. - - If `Reload` has been called, it will `Reload` the newly created - accumulators. - - Args: - path: Path to the event files (or event directory) for given run. - name: Name of the run to add. If not provided, is set to path. - - Returns: - The `EventMultiplexer`. - """ - if name is None or name is '': - name = path - accumulator = None - with self._accumulators_mutex: - if name not in self._accumulators or self._paths[name] != path: - if name in self._paths and self._paths[name] != path: - # TODO(danmane) - Make it impossible to overwrite an old path with - # a new path (just give the new path a distinct name) - logging.warning('Conflict for name %s: old path %s, new path %s', - name, self._paths[name], path) - logging.info('Constructing EventAccumulator for %s', path) - accumulator = event_accumulator.EventAccumulator( - path, - size_guidance=self._size_guidance, - purge_orphaned_data=self.purge_orphaned_data) - self._accumulators[name] = accumulator - self._paths[name] = path - if accumulator: - if self._reload_called: - accumulator.Reload() - return self - - def AddRunsFromDirectory(self, path, name=None): - """Load runs from a directory; recursively walks subdirectories. - - If path doesn't exist, no-op. This ensures that it is safe to call - `AddRunsFromDirectory` multiple times, even before the directory is made. - - If path is a directory, load event files in the directory (if any exist) and - recursively call AddRunsFromDirectory on any subdirectories. This mean you - can call AddRunsFromDirectory at the root of a tree of event logs and - TensorBoard will load them all. - - If the `EventMultiplexer` is already loaded this will cause - the newly created accumulators to `Reload()`. - Args: - path: A string path to a directory to load runs from. - name: Optionally, what name to apply to the runs. If name is provided - and the directory contains run subdirectories, the name of each subrun - is the concatenation of the parent name and the subdirectory name. If - name is provided and the directory contains event files, then a run - is added called "name" and with the events from the path. - - Raises: - ValueError: If the path exists and isn't a directory. - - Returns: - The `EventMultiplexer`. - """ - logging.info('Starting AddRunsFromDirectory: %s', path) - for subdir in GetLogdirSubdirectories(path): - logging.info('Adding events from directory %s', subdir) - rpath = os.path.relpath(subdir, path) - subname = os.path.join(name, rpath) if name else rpath - self.AddRun(subdir, name=subname) - logging.info('Done with AddRunsFromDirectory: %s', path) - return self - - def Reload(self): - """Call `Reload` on every `EventAccumulator`.""" - logging.info('Beginning EventMultiplexer.Reload()') - self._reload_called = True - # Build a list so we're safe even if the list of accumulators is modified - # even while we're reloading. - with self._accumulators_mutex: - items = list(self._accumulators.items()) - - names_to_delete = set() - for name, accumulator in items: - try: - accumulator.Reload() - except (OSError, IOError) as e: - logging.error("Unable to reload accumulator '%s': %s", name, e) - except directory_watcher.DirectoryDeletedError: - names_to_delete.add(name) - - with self._accumulators_mutex: - for name in names_to_delete: - logging.warning("Deleting accumulator '%s'", name) - del self._accumulators[name] - logging.info('Finished with EventMultiplexer.Reload()') - return self - - def FirstEventTimestamp(self, run): - """Return the timestamp of the first event of the given run. - - This may perform I/O if no events have been loaded yet for the run. - - Args: - run: A string name of the run for which the timestamp is retrieved. - - Returns: - The wall_time of the first event of the run, which will typically be - seconds since the epoch. - - Raises: - KeyError: If the run is not found. - ValueError: If the run has no events loaded and there are no events on - disk to load. - """ - accumulator = self._GetAccumulator(run) - return accumulator.FirstEventTimestamp() - - def Scalars(self, run, tag): - """Retrieve the scalar events associated with a run and tag. - - Args: - run: A string name of the run for which values are retrieved. - tag: A string name of the tag for which values are retrieved. - - Raises: - KeyError: If the run is not found, or the tag is not available for - the given run. - - Returns: - An array of `event_accumulator.ScalarEvents`. - """ - accumulator = self._GetAccumulator(run) - return accumulator.Scalars(tag) - - def HealthPills(self, run, node_name): - """Retrieve the health pill events associated with a run and node name. - - Args: - run: A string name of the run for which health pills are retrieved. - node_name: A string name of the node for which health pills are retrieved. - - Raises: - KeyError: If the run is not found, or the node name is not available for - the given run. - - Returns: - An array of `event_accumulator.HealthPillEvents`. - """ - accumulator = self._GetAccumulator(run) - return accumulator.HealthPills(node_name) - - def Graph(self, run): - """Retrieve the graph associated with the provided run. - - Args: - run: A string name of a run to load the graph for. - - Raises: - KeyError: If the run is not found. - ValueError: If the run does not have an associated graph. - - Returns: - The `GraphDef` protobuf data structure. - """ - accumulator = self._GetAccumulator(run) - return accumulator.Graph() - - def MetaGraph(self, run): - """Retrieve the metagraph associated with the provided run. - - Args: - run: A string name of a run to load the graph for. - - Raises: - KeyError: If the run is not found. - ValueError: If the run does not have an associated graph. - - Returns: - The `MetaGraphDef` protobuf data structure. - """ - accumulator = self._GetAccumulator(run) - return accumulator.MetaGraph() - - def RunMetadata(self, run, tag): - """Get the session.run() metadata associated with a TensorFlow run and tag. - - Args: - run: A string name of a TensorFlow run. - tag: A string name of the tag associated with a particular session.run(). - - Raises: - KeyError: If the run is not found, or the tag is not available for the - given run. - - Returns: - The metadata in the form of `RunMetadata` protobuf data structure. - """ - accumulator = self._GetAccumulator(run) - return accumulator.RunMetadata(tag) - - def Histograms(self, run, tag): - """Retrieve the histogram events associated with a run and tag. - - Args: - run: A string name of the run for which values are retrieved. - tag: A string name of the tag for which values are retrieved. - - Raises: - KeyError: If the run is not found, or the tag is not available for - the given run. - - Returns: - An array of `event_accumulator.HistogramEvents`. - """ - accumulator = self._GetAccumulator(run) - return accumulator.Histograms(tag) - - def CompressedHistograms(self, run, tag): - """Retrieve the compressed histogram events associated with a run and tag. - - Args: - run: A string name of the run for which values are retrieved. - tag: A string name of the tag for which values are retrieved. - - Raises: - KeyError: If the run is not found, or the tag is not available for - the given run. - - Returns: - An array of `event_accumulator.CompressedHistogramEvents`. - """ - accumulator = self._GetAccumulator(run) - return accumulator.CompressedHistograms(tag) - - def Images(self, run, tag): - """Retrieve the image events associated with a run and tag. - - Args: - run: A string name of the run for which values are retrieved. - tag: A string name of the tag for which values are retrieved. - - Raises: - KeyError: If the run is not found, or the tag is not available for - the given run. - - Returns: - An array of `event_accumulator.ImageEvents`. - """ - accumulator = self._GetAccumulator(run) - return accumulator.Images(tag) - - def Audio(self, run, tag): - """Retrieve the audio events associated with a run and tag. - - Args: - run: A string name of the run for which values are retrieved. - tag: A string name of the tag for which values are retrieved. - - Raises: - KeyError: If the run is not found, or the tag is not available for - the given run. - - Returns: - An array of `event_accumulator.AudioEvents`. - """ - accumulator = self._GetAccumulator(run) - return accumulator.Audio(tag) - - def Tensors(self, run, tag): - """Retrieve the tensor events associated with a run and tag. - - Args: - run: A string name of the run for which values are retrieved. - tag: A string name of the tag for which values are retrieved. - - Raises: - KeyError: If the run is not found, or the tag is not available for - the given run. - - Returns: - An array of `event_accumulator.TensorEvent`s. - """ - accumulator = self._GetAccumulator(run) - return accumulator.Tensors(tag) - - def Runs(self): - """Return all the run names in the `EventMultiplexer`. - - Returns: - ``` - {runName: { images: [tag1, tag2, tag3], - scalarValues: [tagA, tagB, tagC], - histograms: [tagX, tagY, tagZ], - compressedHistograms: [tagX, tagY, tagZ], - graph: true, meta_graph: true}} - ``` - """ - with self._accumulators_mutex: - # To avoid nested locks, we construct a copy of the run-accumulator map - items = list(six.iteritems(self._accumulators)) - return {run_name: accumulator.Tags() for run_name, accumulator in items} - - def RunPaths(self): - """Returns a dict mapping run names to event file paths.""" - return self._paths - - def _GetAccumulator(self, run): - with self._accumulators_mutex: - return self._accumulators[run] - - -def GetLogdirSubdirectories(path): - """Returns subdirectories with event files on path.""" - if gfile.Exists(path) and not gfile.IsDirectory(path): - raise ValueError('GetLogdirSubdirectories: path exists and is not a ' - 'directory, %s' % path) - - # ListRecursively just yields nothing if the path doesn't exist. - return ( - subdir - for (subdir, files) in io_wrapper.ListRecursively(path) - if list(filter(event_accumulator.IsTensorFlowEventsFile, files)) - ) diff --git a/tensorflow/python/summary/event_multiplexer_test.py b/tensorflow/python/summary/event_multiplexer_test.py deleted file mode 100644 index 8f78c6c547..0000000000 --- a/tensorflow/python/summary/event_multiplexer_test.py +++ /dev/null @@ -1,319 +0,0 @@ -# Copyright 2015 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import os -import os.path -import shutil - -from tensorflow.python.framework import test_util -from tensorflow.python.platform import gfile -from tensorflow.python.platform import googletest -from tensorflow.python.summary import event_accumulator -from tensorflow.python.summary import event_multiplexer - - -def _AddEvents(path): - if not gfile.IsDirectory(path): - gfile.MakeDirs(path) - fpath = os.path.join(path, 'hypothetical.tfevents.out') - with gfile.GFile(fpath, 'w') as f: - f.write('') - return fpath - - -def _CreateCleanDirectory(path): - if gfile.IsDirectory(path): - gfile.DeleteRecursively(path) - gfile.MkDir(path) - - -class _FakeAccumulator(object): - - def __init__(self, path): - self._path = path - self.reload_called = False - self._node_names_to_health_pills = {'Add': ['hp1', 'hp2']} - - def Tags(self): - return {event_accumulator.IMAGES: ['im1', 'im2'], - event_accumulator.AUDIO: ['snd1', 'snd2'], - event_accumulator.HISTOGRAMS: ['hst1', 'hst2'], - event_accumulator.COMPRESSED_HISTOGRAMS: ['cmphst1', 'cmphst2'], - event_accumulator.SCALARS: ['sv1', 'sv2']} - - def FirstEventTimestamp(self): - return 0 - - def _TagHelper(self, tag_name, enum): - if tag_name not in self.Tags()[enum]: - raise KeyError - return ['%s/%s' % (self._path, tag_name)] - - def Scalars(self, tag_name): - return self._TagHelper(tag_name, event_accumulator.SCALARS) - - def HealthPills(self, node_name): - if node_name not in self._node_names_to_health_pills: - raise KeyError - health_pills = self._node_names_to_health_pills[node_name] - return [self._path + '/' + health_pill for health_pill in health_pills] - - def Histograms(self, tag_name): - return self._TagHelper(tag_name, event_accumulator.HISTOGRAMS) - - def CompressedHistograms(self, tag_name): - return self._TagHelper(tag_name, event_accumulator.COMPRESSED_HISTOGRAMS) - - def Images(self, tag_name): - return self._TagHelper(tag_name, event_accumulator.IMAGES) - - def Audio(self, tag_name): - return self._TagHelper(tag_name, event_accumulator.AUDIO) - - def Tensors(self, tag_name): - return self._TagHelper(tag_name, event_accumulator.TENSORS) - - def Reload(self): - self.reload_called = True - - -# pylint: disable=unused-argument -def _GetFakeAccumulator( - path, - size_guidance=None, - compression_bps=None, - purge_orphaned_data=None): - return _FakeAccumulator(path) -# pylint: enable=unused-argument - - -class EventMultiplexerTest(test_util.TensorFlowTestCase): - - def setUp(self): - super(EventMultiplexerTest, self).setUp() - self.stubs = googletest.StubOutForTesting() - - self.stubs.Set(event_accumulator, 'EventAccumulator', _GetFakeAccumulator) - - def tearDown(self): - self.stubs.CleanUp() - - def testEmptyLoader(self): - x = event_multiplexer.EventMultiplexer() - self.assertEqual(x.Runs(), {}) - - def testRunNamesRespected(self): - x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'}) - self.assertItemsEqual(sorted(x.Runs().keys()), ['run1', 'run2']) - self.assertEqual(x._GetAccumulator('run1')._path, 'path1') - self.assertEqual(x._GetAccumulator('run2')._path, 'path2') - - def testReload(self): - x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'}) - self.assertFalse(x._GetAccumulator('run1').reload_called) - self.assertFalse(x._GetAccumulator('run2').reload_called) - x.Reload() - self.assertTrue(x._GetAccumulator('run1').reload_called) - self.assertTrue(x._GetAccumulator('run2').reload_called) - - def testScalars(self): - x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'}) - - run1_actual = x.Scalars('run1', 'sv1') - run1_expected = ['path1/sv1'] - - self.assertEqual(run1_expected, run1_actual) - - def testHealthPills(self): - x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'}) - self.assertEqual(['path1/hp1', 'path1/hp2'], x.HealthPills('run1', 'Add')) - - def testExceptions(self): - x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'}) - with self.assertRaises(KeyError): - x.Scalars('sv1', 'xxx') - - def testInitialization(self): - x = event_multiplexer.EventMultiplexer() - self.assertEqual(x.Runs(), {}) - x = event_multiplexer.EventMultiplexer({'run1': 'path1', 'run2': 'path2'}) - self.assertItemsEqual(x.Runs(), ['run1', 'run2']) - self.assertEqual(x._GetAccumulator('run1')._path, 'path1') - self.assertEqual(x._GetAccumulator('run2')._path, 'path2') - - def testAddRunsFromDirectory(self): - x = event_multiplexer.EventMultiplexer() - tmpdir = self.get_temp_dir() - join = os.path.join - fakedir = join(tmpdir, 'fake_accumulator_directory') - realdir = join(tmpdir, 'real_accumulator_directory') - self.assertEqual(x.Runs(), {}) - x.AddRunsFromDirectory(fakedir) - self.assertEqual(x.Runs(), {}, 'loading fakedir had no effect') - - _CreateCleanDirectory(realdir) - x.AddRunsFromDirectory(realdir) - self.assertEqual(x.Runs(), {}, 'loading empty directory had no effect') - - path1 = join(realdir, 'path1') - gfile.MkDir(path1) - x.AddRunsFromDirectory(realdir) - self.assertEqual(x.Runs(), {}, 'creating empty subdirectory had no effect') - - _AddEvents(path1) - x.AddRunsFromDirectory(realdir) - self.assertItemsEqual(x.Runs(), ['path1'], 'loaded run: path1') - loader1 = x._GetAccumulator('path1') - self.assertEqual(loader1._path, path1, 'has the correct path') - - path2 = join(realdir, 'path2') - _AddEvents(path2) - x.AddRunsFromDirectory(realdir) - self.assertItemsEqual(x.Runs(), ['path1', 'path2']) - self.assertEqual( - x._GetAccumulator('path1'), loader1, 'loader1 not regenerated') - - path2_2 = join(path2, 'path2') - _AddEvents(path2_2) - x.AddRunsFromDirectory(realdir) - self.assertItemsEqual(x.Runs(), ['path1', 'path2', 'path2/path2']) - self.assertEqual( - x._GetAccumulator('path2/path2')._path, path2_2, 'loader2 path correct') - - def testAddRunsFromDirectoryThatContainsEvents(self): - x = event_multiplexer.EventMultiplexer() - tmpdir = self.get_temp_dir() - join = os.path.join - realdir = join(tmpdir, 'event_containing_directory') - - _CreateCleanDirectory(realdir) - - self.assertEqual(x.Runs(), {}) - - _AddEvents(realdir) - x.AddRunsFromDirectory(realdir) - self.assertItemsEqual(x.Runs(), ['.']) - - subdir = join(realdir, 'subdir') - _AddEvents(subdir) - x.AddRunsFromDirectory(realdir) - self.assertItemsEqual(x.Runs(), ['.', 'subdir']) - - def testAddRunsFromDirectoryWithRunNames(self): - x = event_multiplexer.EventMultiplexer() - tmpdir = self.get_temp_dir() - join = os.path.join - realdir = join(tmpdir, 'event_containing_directory') - - _CreateCleanDirectory(realdir) - - self.assertEqual(x.Runs(), {}) - - _AddEvents(realdir) - x.AddRunsFromDirectory(realdir, 'foo') - self.assertItemsEqual(x.Runs(), ['foo/.']) - - subdir = join(realdir, 'subdir') - _AddEvents(subdir) - x.AddRunsFromDirectory(realdir, 'foo') - self.assertItemsEqual(x.Runs(), ['foo/.', 'foo/subdir']) - - def testAddRunsFromDirectoryWalksTree(self): - x = event_multiplexer.EventMultiplexer() - tmpdir = self.get_temp_dir() - join = os.path.join - realdir = join(tmpdir, 'event_containing_directory') - - _CreateCleanDirectory(realdir) - _AddEvents(realdir) - sub = join(realdir, 'subdirectory') - sub1 = join(sub, '1') - sub2 = join(sub, '2') - sub1_1 = join(sub1, '1') - _AddEvents(sub1) - _AddEvents(sub2) - _AddEvents(sub1_1) - x.AddRunsFromDirectory(realdir) - - self.assertItemsEqual(x.Runs(), ['.', 'subdirectory/1', 'subdirectory/2', - 'subdirectory/1/1']) - - def testAddRunsFromDirectoryThrowsException(self): - x = event_multiplexer.EventMultiplexer() - tmpdir = self.get_temp_dir() - - filepath = _AddEvents(tmpdir) - with self.assertRaises(ValueError): - x.AddRunsFromDirectory(filepath) - - def testAddRun(self): - x = event_multiplexer.EventMultiplexer() - x.AddRun('run1_path', 'run1') - run1 = x._GetAccumulator('run1') - self.assertEqual(sorted(x.Runs().keys()), ['run1']) - self.assertEqual(run1._path, 'run1_path') - - x.AddRun('run1_path', 'run1') - self.assertEqual(run1, x._GetAccumulator('run1'), 'loader not recreated') - - x.AddRun('run2_path', 'run1') - new_run1 = x._GetAccumulator('run1') - self.assertEqual(new_run1._path, 'run2_path') - self.assertNotEqual(run1, new_run1) - - x.AddRun('runName3') - self.assertItemsEqual(sorted(x.Runs().keys()), ['run1', 'runName3']) - self.assertEqual(x._GetAccumulator('runName3')._path, 'runName3') - - def testAddRunMaintainsLoading(self): - x = event_multiplexer.EventMultiplexer() - x.Reload() - x.AddRun('run1') - x.AddRun('run2') - self.assertTrue(x._GetAccumulator('run1').reload_called) - self.assertTrue(x._GetAccumulator('run2').reload_called) - - -class EventMultiplexerWithRealAccumulatorTest(test_util.TensorFlowTestCase): - - def testDeletingDirectoryRemovesRun(self): - x = event_multiplexer.EventMultiplexer() - tmpdir = self.get_temp_dir() - join = os.path.join - run1_dir = join(tmpdir, 'run1') - run2_dir = join(tmpdir, 'run2') - run3_dir = join(tmpdir, 'run3') - - for dirname in [run1_dir, run2_dir, run3_dir]: - _AddEvents(dirname) - - x.AddRun(run1_dir, 'run1') - x.AddRun(run2_dir, 'run2') - x.AddRun(run3_dir, 'run3') - - x.Reload() - - # Delete the directory, then reload. - shutil.rmtree(run2_dir) - x.Reload() - self.assertNotIn('run2', x.Runs().keys()) - - -if __name__ == '__main__': - googletest.main() diff --git a/tensorflow/python/summary/impl/__init__.py b/tensorflow/python/summary/impl/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 --- a/tensorflow/python/summary/impl/__init__.py +++ /dev/null diff --git a/tensorflow/python/summary/impl/directory_watcher.py b/tensorflow/python/summary/impl/directory_watcher.py deleted file mode 100644 index 799e01a836..0000000000 --- a/tensorflow/python/summary/impl/directory_watcher.py +++ /dev/null @@ -1,254 +0,0 @@ -# Copyright 2015 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== - -"""Contains the implementation for the DirectoryWatcher class.""" -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import bisect - -from tensorflow.python.framework import errors -from tensorflow.python.platform import gfile -from tensorflow.python.platform import tf_logging as logging -from tensorflow.python.summary.impl import io_wrapper - - -class DirectoryWatcher(object): - """A DirectoryWatcher wraps a loader to load from a sequence of paths. - - A loader reads a path and produces some kind of values as an iterator. A - DirectoryWatcher takes a directory, a factory for loaders, and optionally a - path filter and watches all the paths inside that directory. - - This class is only valid under the assumption that only one path will be - written to by the data source at a time and that once the source stops writing - to a path, it will start writing to a new path that's lexicographically - greater and never come back. It uses some heuristics to check whether this is - true based on tracking changes to the files' sizes, but the check can have - false negatives. However, it should have no false positives. - """ - - def __init__(self, directory, loader_factory, path_filter=lambda x: True): - """Constructs a new DirectoryWatcher. - - Args: - directory: The directory to load files from. - loader_factory: A factory for creating loaders. The factory should take a - path and return an object that has a Load method returning an - iterator that will yield all events that have not been yielded yet. - path_filter: If specified, only paths matching this filter are loaded. - - Raises: - ValueError: If path_provider or loader_factory are None. - """ - if directory is None: - raise ValueError('A directory is required') - if loader_factory is None: - raise ValueError('A loader factory is required') - self._directory = directory - self._path = None - self._loader_factory = loader_factory - self._loader = None - self._path_filter = path_filter - self._ooo_writes_detected = False - # The file size for each file at the time it was finalized. - self._finalized_sizes = {} - - def Load(self): - """Loads new values. - - The watcher will load from one path at a time; as soon as that path stops - yielding events, it will move on to the next path. We assume that old paths - are never modified after a newer path has been written. As a result, Load() - can be called multiple times in a row without losing events that have not - been yielded yet. In other words, we guarantee that every event will be - yielded exactly once. - - Yields: - All values that have not been yielded yet. - - Raises: - DirectoryDeletedError: If the directory has been permanently deleted - (as opposed to being temporarily unavailable). - """ - try: - for event in self._LoadInternal(): - yield event - except errors.OpError: - if not gfile.Exists(self._directory): - raise DirectoryDeletedError( - 'Directory %s has been permanently deleted' % self._directory) - - def _LoadInternal(self): - """Internal implementation of Load(). - - The only difference between this and Load() is that the latter will throw - DirectoryDeletedError on I/O errors if it thinks that the directory has been - permanently deleted. - - Yields: - All values that have not been yielded yet. - """ - - # If the loader exists, check it for a value. - if not self._loader: - self._InitializeLoader() - - while True: - # Yield all the new events in the path we're currently loading from. - for event in self._loader.Load(): - yield event - - next_path = self._GetNextPath() - if not next_path: - logging.info('No path found after %s', self._path) - # Current path is empty and there are no new paths, so we're done. - return - - # There's a new path, so check to make sure there weren't any events - # written between when we finished reading the current path and when we - # checked for the new one. The sequence of events might look something - # like this: - # - # 1. Event #1 written to path #1. - # 2. We check for events and yield event #1 from path #1 - # 3. We check for events and see that there are no more events in path #1. - # 4. Event #2 is written to path #1. - # 5. Event #3 is written to path #2. - # 6. We check for a new path and see that path #2 exists. - # - # Without this loop, we would miss event #2. We're also guaranteed by the - # loader contract that no more events will be written to path #1 after - # events start being written to path #2, so we don't have to worry about - # that. - for event in self._loader.Load(): - yield event - - logging.info('Directory watcher advancing from %s to %s', self._path, - next_path) - - # Advance to the next path and start over. - self._SetPath(next_path) - - # The number of paths before the current one to check for out of order writes. - _OOO_WRITE_CHECK_COUNT = 20 - - def OutOfOrderWritesDetected(self): - """Returns whether any out-of-order writes have been detected. - - Out-of-order writes are only checked as part of the Load() iterator. Once an - out-of-order write is detected, this function will always return true. - - Note that out-of-order write detection is not performed on GCS paths, so - this function will always return false. - - Returns: - Whether any out-of-order write has ever been detected by this watcher. - - """ - return self._ooo_writes_detected - - def _InitializeLoader(self): - path = self._GetNextPath() - if path: - self._SetPath(path) - else: - raise StopIteration - - def _SetPath(self, path): - """Sets the current path to watch for new events. - - This also records the size of the old path, if any. If the size can't be - found, an error is logged. - - Args: - path: The full path of the file to watch. - """ - old_path = self._path - if old_path and not io_wrapper.IsGCSPath(old_path): - try: - # We're done with the path, so store its size. - size = gfile.Stat(old_path).length - logging.debug('Setting latest size of %s to %d', old_path, size) - self._finalized_sizes[old_path] = size - except errors.OpError as e: - logging.error('Unable to get size of %s: %s', old_path, e) - - self._path = path - self._loader = self._loader_factory(path) - - def _GetNextPath(self): - """Gets the next path to load from. - - This function also does the checking for out-of-order writes as it iterates - through the paths. - - Returns: - The next path to load events from, or None if there are no more paths. - """ - paths = sorted(path - for path in io_wrapper.ListDirectoryAbsolute(self._directory) - if self._path_filter(path)) - if not paths: - return None - - if self._path is None: - return paths[0] - - # Don't bother checking if the paths are GCS (which we can't check) or if - # we've already detected an OOO write. - if not io_wrapper.IsGCSPath(paths[0]) and not self._ooo_writes_detected: - # Check the previous _OOO_WRITE_CHECK_COUNT paths for out of order writes. - current_path_index = bisect.bisect_left(paths, self._path) - ooo_check_start = max(0, current_path_index - self._OOO_WRITE_CHECK_COUNT) - for path in paths[ooo_check_start:current_path_index]: - if self._HasOOOWrite(path): - self._ooo_writes_detected = True - break - - next_paths = list(path - for path in paths - if self._path is None or path > self._path) - if next_paths: - return min(next_paths) - else: - return None - - def _HasOOOWrite(self, path): - """Returns whether the path has had an out-of-order write.""" - # Check the sizes of each path before the current one. - size = gfile.Stat(path).length - old_size = self._finalized_sizes.get(path, None) - if size != old_size: - if old_size is None: - logging.error('File %s created after file %s even though it\'s ' - 'lexicographically earlier', path, self._path) - else: - logging.error('File %s updated even though the current file is %s', - path, self._path) - return True - else: - return False - - -class DirectoryDeletedError(Exception): - """Thrown by Load() when the directory is *permanently* gone. - - We distinguish this from temporary errors so that other code can decide to - drop all of our data only when a directory has been intentionally deleted, - as opposed to due to transient filesystem errors. - """ - pass diff --git a/tensorflow/python/summary/impl/directory_watcher_test.py b/tensorflow/python/summary/impl/directory_watcher_test.py deleted file mode 100644 index b6ecc15849..0000000000 --- a/tensorflow/python/summary/impl/directory_watcher_test.py +++ /dev/null @@ -1,209 +0,0 @@ -# Copyright 2015 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== - -"""Tests for directory_watcher.""" - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import os -import shutil - -from tensorflow.python.framework import test_util -from tensorflow.python.platform import gfile -from tensorflow.python.platform import googletest -from tensorflow.python.summary.impl import directory_watcher -from tensorflow.python.summary.impl import io_wrapper - - -class _ByteLoader(object): - """A loader that loads individual bytes from a file.""" - - def __init__(self, path): - self._f = open(path) - self.bytes_read = 0 - - def Load(self): - while True: - self._f.seek(self.bytes_read) - byte = self._f.read(1) - if byte: - self.bytes_read += 1 - yield byte - else: - return - - -class DirectoryWatcherTest(test_util.TensorFlowTestCase): - - def setUp(self): - # Put everything in a directory so it's easier to delete. - self._directory = os.path.join(self.get_temp_dir(), 'monitor_dir') - os.mkdir(self._directory) - self._watcher = directory_watcher.DirectoryWatcher(self._directory, - _ByteLoader) - self.stubs = googletest.StubOutForTesting() - - def tearDown(self): - self.stubs.CleanUp() - try: - shutil.rmtree(self._directory) - except OSError: - # Some tests delete the directory. - pass - - def _WriteToFile(self, filename, data): - path = os.path.join(self._directory, filename) - with open(path, 'a') as f: - f.write(data) - - def _LoadAllEvents(self): - """Loads all events in the watcher.""" - for _ in self._watcher.Load(): - pass - - def assertWatcherYields(self, values): - self.assertEqual(list(self._watcher.Load()), values) - - def testRaisesWithBadArguments(self): - with self.assertRaises(ValueError): - directory_watcher.DirectoryWatcher(None, lambda x: None) - with self.assertRaises(ValueError): - directory_watcher.DirectoryWatcher('dir', None) - - def testEmptyDirectory(self): - self.assertWatcherYields([]) - - def testSingleWrite(self): - self._WriteToFile('a', 'abc') - self.assertWatcherYields(['a', 'b', 'c']) - self.assertFalse(self._watcher.OutOfOrderWritesDetected()) - - def testMultipleWrites(self): - self._WriteToFile('a', 'abc') - self.assertWatcherYields(['a', 'b', 'c']) - self._WriteToFile('a', 'xyz') - self.assertWatcherYields(['x', 'y', 'z']) - self.assertFalse(self._watcher.OutOfOrderWritesDetected()) - - def testMultipleLoads(self): - self._WriteToFile('a', 'a') - self._watcher.Load() - self._watcher.Load() - self.assertWatcherYields(['a']) - self.assertFalse(self._watcher.OutOfOrderWritesDetected()) - - def testMultipleFilesAtOnce(self): - self._WriteToFile('b', 'b') - self._WriteToFile('a', 'a') - self.assertWatcherYields(['a', 'b']) - self.assertFalse(self._watcher.OutOfOrderWritesDetected()) - - def testFinishesLoadingFileWhenSwitchingToNewFile(self): - self._WriteToFile('a', 'a') - # Empty the iterator. - self.assertEquals(['a'], list(self._watcher.Load())) - self._WriteToFile('a', 'b') - self._WriteToFile('b', 'c') - # The watcher should finish its current file before starting a new one. - self.assertWatcherYields(['b', 'c']) - self.assertFalse(self._watcher.OutOfOrderWritesDetected()) - - def testIntermediateEmptyFiles(self): - self._WriteToFile('a', 'a') - self._WriteToFile('b', '') - self._WriteToFile('c', 'c') - self.assertWatcherYields(['a', 'c']) - self.assertFalse(self._watcher.OutOfOrderWritesDetected()) - - def testPathFilter(self): - self._watcher = directory_watcher.DirectoryWatcher( - self._directory, _ByteLoader, - lambda path: 'do_not_watch_me' not in path) - - self._WriteToFile('a', 'a') - self._WriteToFile('do_not_watch_me', 'b') - self._WriteToFile('c', 'c') - self.assertWatcherYields(['a', 'c']) - self.assertFalse(self._watcher.OutOfOrderWritesDetected()) - - def testDetectsNewOldFiles(self): - self._WriteToFile('b', 'a') - self._LoadAllEvents() - self._WriteToFile('a', 'a') - self._LoadAllEvents() - self.assertTrue(self._watcher.OutOfOrderWritesDetected()) - - def testIgnoresNewerFiles(self): - self._WriteToFile('a', 'a') - self._LoadAllEvents() - self._WriteToFile('q', 'a') - self._LoadAllEvents() - self.assertFalse(self._watcher.OutOfOrderWritesDetected()) - - def testDetectsChangingOldFiles(self): - self._WriteToFile('a', 'a') - self._WriteToFile('b', 'a') - self._LoadAllEvents() - self._WriteToFile('a', 'c') - self._LoadAllEvents() - self.assertTrue(self._watcher.OutOfOrderWritesDetected()) - - def testDoesntCrashWhenFileIsDeleted(self): - self._WriteToFile('a', 'a') - self._LoadAllEvents() - os.remove(os.path.join(self._directory, 'a')) - self._WriteToFile('b', 'b') - self.assertWatcherYields(['b']) - - def testRaisesRightErrorWhenDirectoryIsDeleted(self): - self._WriteToFile('a', 'a') - self._LoadAllEvents() - shutil.rmtree(self._directory) - with self.assertRaises(directory_watcher.DirectoryDeletedError): - self._LoadAllEvents() - - def testDoesntRaiseDirectoryDeletedErrorIfOutageIsTransient(self): - self._WriteToFile('a', 'a') - self._LoadAllEvents() - shutil.rmtree(self._directory) - - # Fake a single transient I/O error. - def FakeFactory(original): - - def Fake(*args, **kwargs): - if FakeFactory.has_been_called: - original(*args, **kwargs) - else: - raise OSError('lp0 temporarily on fire') - - return Fake - - FakeFactory.has_been_called = False - - for stub_name in ['ListDirectoryAbsolute', 'ListRecursively']: - self.stubs.Set(io_wrapper, stub_name, - FakeFactory(getattr(io_wrapper, stub_name))) - for stub_name in ['IsDirectory', 'Exists', 'Stat']: - self.stubs.Set(gfile, stub_name, - FakeFactory(getattr(gfile, stub_name))) - - with self.assertRaises((IOError, OSError)): - self._LoadAllEvents() - - -if __name__ == '__main__': - googletest.main() diff --git a/tensorflow/python/summary/impl/event_file_loader.py b/tensorflow/python/summary/impl/event_file_loader.py deleted file mode 100644 index ccc61d4564..0000000000 --- a/tensorflow/python/summary/impl/event_file_loader.py +++ /dev/null @@ -1,80 +0,0 @@ -# Copyright 2015 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== - -"""Functionality for loading events from a record file.""" -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -from tensorflow.core.util import event_pb2 -from tensorflow.python import pywrap_tensorflow -from tensorflow.python.framework import errors -from tensorflow.python.platform import app -from tensorflow.python.platform import resource_loader -from tensorflow.python.platform import tf_logging as logging -from tensorflow.python.util import compat - - -class EventFileLoader(object): - """An EventLoader is an iterator that yields Event protos.""" - - def __init__(self, file_path): - if file_path is None: - raise ValueError('A file path is required') - file_path = resource_loader.readahead_file_path(file_path) - logging.debug('Opening a record reader pointing at %s', file_path) - with errors.raise_exception_on_not_ok_status() as status: - self._reader = pywrap_tensorflow.PyRecordReader_New( - compat.as_bytes(file_path), 0, compat.as_bytes(''), status) - # Store it for logging purposes. - self._file_path = file_path - if not self._reader: - raise IOError('Failed to open a record reader pointing to %s' % file_path) - - def Load(self): - """Loads all new values from disk. - - Calling Load multiple times in a row will not 'drop' events as long as the - return value is not iterated over. - - Yields: - All values that were written to disk that have not been yielded yet. - """ - while True: - try: - with errors.raise_exception_on_not_ok_status() as status: - self._reader.GetNext(status) - except (errors.DataLossError, errors.OutOfRangeError): - # We ignore partial read exceptions, because a record may be truncated. - # PyRecordReader holds the offset prior to the failed read, so retrying - # will succeed. - break - event = event_pb2.Event() - event.ParseFromString(self._reader.record()) - yield event - logging.debug('No more events in %s', self._file_path) - - -def main(argv): - if len(argv) != 2: - print('Usage: event_file_loader <path-to-the-recordio-file>') - return 1 - loader = EventFileLoader(argv[1]) - for event in loader.Load(): - print(event) - - -if __name__ == '__main__': - app.run() diff --git a/tensorflow/python/summary/impl/event_file_loader_test.py b/tensorflow/python/summary/impl/event_file_loader_test.py deleted file mode 100644 index 0b354d553d..0000000000 --- a/tensorflow/python/summary/impl/event_file_loader_test.py +++ /dev/null @@ -1,92 +0,0 @@ -# Copyright 2015 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== - -"""Tests for event_file_loader.""" - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import os -import tempfile - -from tensorflow.python.framework import test_util -from tensorflow.python.platform import googletest -from tensorflow.python.summary.impl import event_file_loader - - -class EventFileLoaderTest(test_util.TensorFlowTestCase): - # A record containing a simple event. - RECORD = (b'\x18\x00\x00\x00\x00\x00\x00\x00\xa3\x7fK"\t\x00\x00\xc0%\xddu' - b'\xd5A\x1a\rbrain.Event:1\xec\xf32\x8d') - - def _WriteToFile(self, filename, data): - with open(filename, 'ab') as f: - f.write(data) - - def _LoaderForTestFile(self, filename): - return event_file_loader.EventFileLoader( - os.path.join(self.get_temp_dir(), filename)) - - def testEmptyEventFile(self): - filename = tempfile.NamedTemporaryFile(dir=self.get_temp_dir()).name - self._WriteToFile(filename, b'') - loader = self._LoaderForTestFile(filename) - self.assertEqual(len(list(loader.Load())), 0) - - def testSingleWrite(self): - filename = tempfile.NamedTemporaryFile(dir=self.get_temp_dir()).name - self._WriteToFile(filename, EventFileLoaderTest.RECORD) - loader = self._LoaderForTestFile(filename) - events = list(loader.Load()) - self.assertEqual(len(events), 1) - self.assertEqual(events[0].wall_time, 1440183447.0) - self.assertEqual(len(list(loader.Load())), 0) - - def testMultipleWrites(self): - filename = tempfile.NamedTemporaryFile(dir=self.get_temp_dir()).name - self._WriteToFile(filename, EventFileLoaderTest.RECORD) - loader = self._LoaderForTestFile(filename) - self.assertEqual(len(list(loader.Load())), 1) - self._WriteToFile(filename, EventFileLoaderTest.RECORD) - self.assertEqual(len(list(loader.Load())), 1) - - def testMultipleLoads(self): - filename = tempfile.NamedTemporaryFile(dir=self.get_temp_dir()).name - self._WriteToFile(filename, EventFileLoaderTest.RECORD) - loader = self._LoaderForTestFile(filename) - loader.Load() - loader.Load() - self.assertEqual(len(list(loader.Load())), 1) - - def testMultipleWritesAtOnce(self): - filename = tempfile.NamedTemporaryFile(dir=self.get_temp_dir()).name - self._WriteToFile(filename, EventFileLoaderTest.RECORD) - self._WriteToFile(filename, EventFileLoaderTest.RECORD) - loader = self._LoaderForTestFile(filename) - self.assertEqual(len(list(loader.Load())), 2) - - def testMultipleWritesWithBadWrite(self): - filename = tempfile.NamedTemporaryFile(dir=self.get_temp_dir()).name - self._WriteToFile(filename, EventFileLoaderTest.RECORD) - self._WriteToFile(filename, EventFileLoaderTest.RECORD) - # Test that we ignore partial record writes at the end of the file. - self._WriteToFile(filename, b'123') - loader = self._LoaderForTestFile(filename) - self.assertEqual(len(list(loader.Load())), 2) - - -if __name__ == '__main__': - googletest.main() diff --git a/tensorflow/python/summary/impl/io_wrapper.py b/tensorflow/python/summary/impl/io_wrapper.py deleted file mode 100644 index 258fe8c804..0000000000 --- a/tensorflow/python/summary/impl/io_wrapper.py +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright 2015 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -"""IO helper functions.""" -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import os - -from tensorflow.python.platform import gfile - - -def IsGCSPath(path): - return path.startswith("gs://") - - -def ListDirectoryAbsolute(directory): - """Yields all files in the given directory. The paths are absolute.""" - return (os.path.join(directory, path) - for path in gfile.ListDirectory(directory)) - - -def ListRecursively(top): - """Walks a directory tree, yielding (dir_path, file_paths) tuples. - - For each of `top` and its subdirectories, yields a tuple containing the path - to the directory and the path to each of the contained files. Note that - unlike os.Walk()/gfile.Walk(), this does not list subdirectories and the file - paths are all absolute. - - If the directory does not exist, this yields nothing. - - Args: - top: A path to a directory.. - Yields: - A list of (dir_path, file_paths) tuples. - """ - for dir_path, _, filenames in gfile.Walk(top): - yield (dir_path, (os.path.join(dir_path, filename) - for filename in filenames)) diff --git a/tensorflow/python/summary/impl/reservoir.py b/tensorflow/python/summary/impl/reservoir.py deleted file mode 100644 index 0a1252e635..0000000000 --- a/tensorflow/python/summary/impl/reservoir.py +++ /dev/null @@ -1,253 +0,0 @@ -# Copyright 2015 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== - -"""A key-value[] store that implements reservoir sampling on the values.""" - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import collections -import random -import threading - - -class Reservoir(object): - """A map-to-arrays container, with deterministic Reservoir Sampling. - - Items are added with an associated key. Items may be retrieved by key, and - a list of keys can also be retrieved. If size is not zero, then it dictates - the maximum number of items that will be stored with each key. Once there are - more items for a given key, they are replaced via reservoir sampling, such - that each item has an equal probability of being included in the sample. - - Deterministic means that for any given seed and bucket size, the sequence of - values that are kept for any given tag will always be the same, and that this - is independent of any insertions on other tags. That is: - - >>> separate_reservoir = reservoir.Reservoir(10) - >>> interleaved_reservoir = reservoir.Reservoir(10) - >>> for i in xrange(100): - >>> separate_reservoir.AddItem('key1', i) - >>> for i in xrange(100): - >>> separate_reservoir.AddItem('key2', i) - >>> for i in xrange(100): - >>> interleaved_reservoir.AddItem('key1', i) - >>> interleaved_reservoir.AddItem('key2', i) - - separate_reservoir and interleaved_reservoir will be in identical states. - - See: https://en.wikipedia.org/wiki/Reservoir_sampling - - Adding items has amortized O(1) runtime. - - """ - - def __init__(self, size, seed=0, always_keep_last=True): - """Creates a new reservoir. - - Args: - size: The number of values to keep in the reservoir for each tag. If 0, - all values will be kept. - seed: The seed of the random number generator to use when sampling. - Different values for |seed| will produce different samples from the same - input items. - always_keep_last: Whether to always keep the latest seen item in the - end of the reservoir. Defaults to True. - - Raises: - ValueError: If size is negative or not an integer. - """ - if size < 0 or size != round(size): - raise ValueError('size must be nonegative integer, was %s' % size) - self._buckets = collections.defaultdict( - lambda: _ReservoirBucket(size, random.Random(seed), always_keep_last)) - # _mutex guards the keys - creating new keys, retrieving by key, etc - # the internal items are guarded by the ReservoirBuckets' internal mutexes - self._mutex = threading.Lock() - - def Keys(self): - """Return all the keys in the reservoir. - - Returns: - ['list', 'of', 'keys'] in the Reservoir. - """ - with self._mutex: - return list(self._buckets.keys()) - - def Items(self, key): - """Return items associated with given key. - - Args: - key: The key for which we are finding associated items. - - Raises: - KeyError: If the key is not found in the reservoir. - - Returns: - [list, of, items] associated with that key. - """ - with self._mutex: - if key not in self._buckets: - raise KeyError('Key %s was not found in Reservoir' % key) - bucket = self._buckets[key] - return bucket.Items() - - def AddItem(self, key, item, f=lambda x: x): - """Add a new item to the Reservoir with the given tag. - - If the reservoir has not yet reached full size, the new item is guaranteed - to be added. If the reservoir is full, then behavior depends on the - always_keep_last boolean. - - If always_keep_last was set to true, the new item is guaranteed to be added - to the reservoir, and either the previous last item will be replaced, or - (with low probability) an older item will be replaced. - - If always_keep_last was set to false, then the new item will replace an - old item with low probability. - - If f is provided, it will be applied to transform item (lazily, iff item is - going to be included in the reservoir). - - Args: - key: The key to store the item under. - item: The item to add to the reservoir. - f: An optional function to transform the item prior to addition. - """ - with self._mutex: - bucket = self._buckets[key] - bucket.AddItem(item, f) - - def FilterItems(self, filterFn, key=None): - """Filter items within a Reservoir, using a filtering function. - - Args: - filterFn: A function that returns True for the items to be kept. - key: An optional bucket key to filter. If not specified, will filter all - all buckets. - - Returns: - The number of items removed. - """ - with self._mutex: - if key: - if key in self._buckets: - return self._buckets[key].FilterItems(filterFn) - else: - return 0 - else: - return sum(bucket.FilterItems(filterFn) - for bucket in self._buckets.values()) - - -class _ReservoirBucket(object): - """A container for items from a stream, that implements reservoir sampling. - - It always stores the most recent item as its final item. - """ - - def __init__(self, _max_size, _random=None, always_keep_last=True): - """Create the _ReservoirBucket. - - Args: - _max_size: The maximum size the reservoir bucket may grow to. If size is - zero, the bucket has unbounded size. - _random: The random number generator to use. If not specified, defaults to - random.Random(0). - always_keep_last: Whether the latest seen item should always be included - in the end of the bucket. - - Raises: - ValueError: if the size is not a nonnegative integer. - """ - if _max_size < 0 or _max_size != round(_max_size): - raise ValueError('_max_size must be nonegative int, was %s' % _max_size) - self.items = [] - # This mutex protects the internal items, ensuring that calls to Items and - # AddItem are thread-safe - self._mutex = threading.Lock() - self._max_size = _max_size - self._num_items_seen = 0 - if _random is not None: - self._random = _random - else: - self._random = random.Random(0) - self.always_keep_last = always_keep_last - - def AddItem(self, item, f=lambda x: x): - """Add an item to the ReservoirBucket, replacing an old item if necessary. - - The new item is guaranteed to be added to the bucket, and to be the last - element in the bucket. If the bucket has reached capacity, then an old item - will be replaced. With probability (_max_size/_num_items_seen) a random item - in the bucket will be popped out and the new item will be appended - to the end. With probability (1 - _max_size/_num_items_seen) - the last item in the bucket will be replaced. - - Since the O(n) replacements occur with O(1/_num_items_seen) likelihood, - the amortized runtime is O(1). - - Args: - item: The item to add to the bucket. - f: A function to transform item before addition, if it will be kept in - the reservoir. - """ - with self._mutex: - if len(self.items) < self._max_size or self._max_size == 0: - self.items.append(f(item)) - else: - r = self._random.randint(0, self._num_items_seen) - if r < self._max_size: - self.items.pop(r) - self.items.append(f(item)) - elif self.always_keep_last: - self.items[-1] = f(item) - self._num_items_seen += 1 - - def FilterItems(self, filterFn): - """Filter items in a ReservoirBucket, using a filtering function. - - Filtering items from the reservoir bucket must update the - internal state variable self._num_items_seen, which is used for determining - the rate of replacement in reservoir sampling. Ideally, self._num_items_seen - would contain the exact number of items that have ever seen by the - ReservoirBucket and satisfy filterFn. However, the ReservoirBucket does not - have access to all items seen -- it only has access to the subset of items - that have survived sampling (self.items). Therefore, we estimate - self._num_items_seen by scaling it by the same ratio as the ratio of items - not removed from self.items. - - Args: - filterFn: A function that returns True for items to be kept. - - Returns: - The number of items removed from the bucket. - """ - with self._mutex: - size_before = len(self.items) - self.items = list(filter(filterFn, self.items)) - size_diff = size_before - len(self.items) - - # Estimate a correction the number of items seen - prop_remaining = len(self.items) / float( - size_before) if size_before > 0 else 0 - self._num_items_seen = int(round(self._num_items_seen * prop_remaining)) - return size_diff - - def Items(self): - """Get all the items in the bucket.""" - with self._mutex: - return list(self.items) diff --git a/tensorflow/python/summary/impl/reservoir_test.py b/tensorflow/python/summary/impl/reservoir_test.py deleted file mode 100644 index c526d64f3d..0000000000 --- a/tensorflow/python/summary/impl/reservoir_test.py +++ /dev/null @@ -1,279 +0,0 @@ -# Copyright 2015 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -from six.moves import xrange # pylint: disable=redefined-builtin - -from tensorflow.python.platform import test -from tensorflow.python.summary.impl import reservoir - - -class ReservoirTest(test.TestCase): - - def testEmptyReservoir(self): - r = reservoir.Reservoir(1) - self.assertFalse(r.Keys()) - - def testRespectsSize(self): - r = reservoir.Reservoir(42) - self.assertEqual(r._buckets['meaning of life']._max_size, 42) - - def testItemsAndKeys(self): - r = reservoir.Reservoir(42) - r.AddItem('foo', 4) - r.AddItem('bar', 9) - r.AddItem('foo', 19) - self.assertItemsEqual(r.Keys(), ['foo', 'bar']) - self.assertEqual(r.Items('foo'), [4, 19]) - self.assertEqual(r.Items('bar'), [9]) - - def testExceptions(self): - with self.assertRaises(ValueError): - reservoir.Reservoir(-1) - with self.assertRaises(ValueError): - reservoir.Reservoir(13.3) - - r = reservoir.Reservoir(12) - with self.assertRaises(KeyError): - r.Items('missing key') - - def testDeterminism(self): - """Tests that the reservoir is deterministic.""" - key = 'key' - r1 = reservoir.Reservoir(10) - r2 = reservoir.Reservoir(10) - for i in xrange(100): - r1.AddItem('key', i) - r2.AddItem('key', i) - - self.assertEqual(r1.Items(key), r2.Items(key)) - - def testBucketDeterminism(self): - """Tests that reservoirs are deterministic at a bucket level. - - This means that only the order elements are added within a bucket matters. - """ - separate_reservoir = reservoir.Reservoir(10) - interleaved_reservoir = reservoir.Reservoir(10) - for i in xrange(100): - separate_reservoir.AddItem('key1', i) - for i in xrange(100): - separate_reservoir.AddItem('key2', i) - for i in xrange(100): - interleaved_reservoir.AddItem('key1', i) - interleaved_reservoir.AddItem('key2', i) - - for key in ['key1', 'key2']: - self.assertEqual( - separate_reservoir.Items(key), interleaved_reservoir.Items(key)) - - def testUsesSeed(self): - """Tests that reservoirs with different seeds keep different samples.""" - key = 'key' - r1 = reservoir.Reservoir(10, seed=0) - r2 = reservoir.Reservoir(10, seed=1) - for i in xrange(100): - r1.AddItem('key', i) - r2.AddItem('key', i) - self.assertNotEqual(r1.Items(key), r2.Items(key)) - - def testFilterItemsByKey(self): - r = reservoir.Reservoir(100, seed=0) - for i in xrange(10): - r.AddItem('key1', i) - r.AddItem('key2', i) - - self.assertEqual(len(r.Items('key1')), 10) - self.assertEqual(len(r.Items('key2')), 10) - - self.assertEqual(r.FilterItems(lambda x: x <= 7, 'key2'), 2) - self.assertEqual(len(r.Items('key2')), 8) - self.assertEqual(len(r.Items('key1')), 10) - - self.assertEqual(r.FilterItems(lambda x: x <= 3, 'key1'), 6) - self.assertEqual(len(r.Items('key1')), 4) - self.assertEqual(len(r.Items('key2')), 8) - - -class ReservoirBucketTest(test.TestCase): - - def testEmptyBucket(self): - b = reservoir._ReservoirBucket(1) - self.assertFalse(b.Items()) - - def testFillToSize(self): - b = reservoir._ReservoirBucket(100) - for i in xrange(100): - b.AddItem(i) - self.assertEqual(b.Items(), list(xrange(100))) - self.assertEqual(b._num_items_seen, 100) - - def testDoesntOverfill(self): - b = reservoir._ReservoirBucket(10) - for i in xrange(1000): - b.AddItem(i) - self.assertEqual(len(b.Items()), 10) - self.assertEqual(b._num_items_seen, 1000) - - def testMaintainsOrder(self): - b = reservoir._ReservoirBucket(100) - for i in xrange(10000): - b.AddItem(i) - items = b.Items() - prev = -1 - for item in items: - self.assertTrue(item > prev) - prev = item - - def testKeepsLatestItem(self): - b = reservoir._ReservoirBucket(5) - for i in xrange(100): - b.AddItem(i) - last = b.Items()[-1] - self.assertEqual(last, i) - - def testSizeOneBucket(self): - b = reservoir._ReservoirBucket(1) - for i in xrange(20): - b.AddItem(i) - self.assertEqual(b.Items(), [i]) - self.assertEqual(b._num_items_seen, 20) - - def testSizeZeroBucket(self): - b = reservoir._ReservoirBucket(0) - for i in xrange(20): - b.AddItem(i) - self.assertEqual(b.Items(), list(range(i + 1))) - self.assertEqual(b._num_items_seen, 20) - - def testSizeRequirement(self): - with self.assertRaises(ValueError): - reservoir._ReservoirBucket(-1) - with self.assertRaises(ValueError): - reservoir._ReservoirBucket(10.3) - - def testRemovesItems(self): - b = reservoir._ReservoirBucket(100) - for i in xrange(10): - b.AddItem(i) - self.assertEqual(len(b.Items()), 10) - self.assertEqual(b._num_items_seen, 10) - self.assertEqual(b.FilterItems(lambda x: x <= 7), 2) - self.assertEqual(len(b.Items()), 8) - self.assertEqual(b._num_items_seen, 8) - - def testRemovesItemsWhenItemsAreReplaced(self): - b = reservoir._ReservoirBucket(100) - for i in xrange(10000): - b.AddItem(i) - self.assertEqual(b._num_items_seen, 10000) - - # Remove items - num_removed = b.FilterItems(lambda x: x <= 7) - self.assertGreater(num_removed, 92) - self.assertEqual([], [item for item in b.Items() if item > 7]) - self.assertEqual(b._num_items_seen, - int(round(10000 * (1 - float(num_removed) / 100)))) - - def testLazyFunctionEvaluationAndAlwaysKeepLast(self): - - class FakeRandom(object): - - def randint(self, a, b): # pylint:disable=unused-argument - return 999 - - class Incrementer(object): - - def __init__(self): - self.n = 0 - - def increment_and_double(self, x): - self.n += 1 - return x * 2 - - # We've mocked the randomness generator, so that once it is full, the last - # item will never get durable reservoir inclusion. Since always_keep_last is - # false, the function should only get invoked 100 times while filling up - # the reservoir. This laziness property is an essential performance - # optimization. - b = reservoir._ReservoirBucket(100, FakeRandom(), always_keep_last=False) - incrementer = Incrementer() - for i in xrange(1000): - b.AddItem(i, incrementer.increment_and_double) - self.assertEqual(incrementer.n, 100) - self.assertEqual(b.Items(), [x * 2 for x in xrange(100)]) - - # This time, we will always keep the last item, meaning that the function - # should get invoked once for every item we add. - b = reservoir._ReservoirBucket(100, FakeRandom(), always_keep_last=True) - incrementer = Incrementer() - - for i in xrange(1000): - b.AddItem(i, incrementer.increment_and_double) - self.assertEqual(incrementer.n, 1000) - self.assertEqual(b.Items(), [x * 2 for x in xrange(99)] + [999 * 2]) - - -class ReservoirBucketStatisticalDistributionTest(test.TestCase): - - def setUp(self): - self.total = 1000000 - self.samples = 10000 - self.n_buckets = 100 - self.total_per_bucket = self.total // self.n_buckets - self.assertEqual(self.total % self.n_buckets, 0, 'total must be evenly ' - 'divisible by the number of buckets') - self.assertTrue(self.total > self.samples, 'need to have more items ' - 'than samples') - - def AssertBinomialQuantity(self, measured): - p = 1.0 * self.n_buckets / self.samples - mean = p * self.samples - variance = p * (1 - p) * self.samples - error = measured - mean - # Given that the buckets were actually binomially distributed, this - # fails with probability ~2E-9 - passed = error * error <= 36.0 * variance - self.assertTrue(passed, 'found a bucket with measured %d ' - 'too far from expected %d' % (measured, mean)) - - def testBucketReservoirSamplingViaStatisticalProperties(self): - # Not related to a 'ReservoirBucket', but instead number of buckets we put - # samples into for testing the shape of the distribution - b = reservoir._ReservoirBucket(_max_size=self.samples) - # add one extra item because we always keep the most recent item, which - # would skew the distribution; we can just slice it off the end instead. - for i in xrange(self.total + 1): - b.AddItem(i) - - divbins = [0] * self.n_buckets - modbins = [0] * self.n_buckets - # Slice off the last item when we iterate. - for item in b.Items()[0:-1]: - divbins[item // self.total_per_bucket] += 1 - modbins[item % self.n_buckets] += 1 - - for bucket_index in xrange(self.n_buckets): - divbin = divbins[bucket_index] - modbin = modbins[bucket_index] - self.AssertBinomialQuantity(divbin) - self.AssertBinomialQuantity(modbin) - - -if __name__ == '__main__': - test.main() |