aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/python/summary/impl/directory_watcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'tensorflow/python/summary/impl/directory_watcher.py')
-rw-r--r--tensorflow/python/summary/impl/directory_watcher.py115
1 files changed, 115 insertions, 0 deletions
diff --git a/tensorflow/python/summary/impl/directory_watcher.py b/tensorflow/python/summary/impl/directory_watcher.py
new file mode 100644
index 0000000000..830e538cb6
--- /dev/null
+++ b/tensorflow/python/summary/impl/directory_watcher.py
@@ -0,0 +1,115 @@
+"""Contains the implementation for the DirectoryWatcher class."""
+import os
+
+from tensorflow.python.platform import gfile
+from tensorflow.python.platform import logging
+
+
+class DirectoryWatcher(object):
+ """A DirectoryWatcher wraps a loader to load from a directory.
+
+ A loader reads a file on disk and produces some kind of values as an
+ iterator. A DirectoryWatcher takes a directory with one file at a time being
+ written to and a factory for loaders and watches all the files at once.
+
+ This class is *only* valid under the assumption that files are never removed
+ and the only file ever changed is whichever one is lexicographically last.
+ """
+
+ def __init__(self, directory, loader_factory, path_filter=lambda x: True):
+ """Constructs a new DirectoryWatcher.
+
+ Args:
+ directory: The directory to watch. The directory doesn't have to exist.
+ loader_factory: A factory for creating loaders. The factory should take a
+ file path and return an object that has a Load method returning an
+ iterator that will yield all events that have not been yielded yet.
+ path_filter: Only files whose full path matches this predicate will be
+ loaded. If not specified, all files are loaded.
+
+ Raises:
+ ValueError: If directory or loader_factory is None.
+ """
+ if directory is None:
+ raise ValueError('A directory is required')
+ if loader_factory is None:
+ raise ValueError('A loader factory is required')
+ self._directory = directory
+ self._loader_factory = loader_factory
+ self._loader = None
+ self._path = None
+ self._path_filter = path_filter
+
+ def Load(self):
+ """Loads new values from disk.
+
+ The watcher will load from one file at a time; as soon as that file stops
+ yielding events, it will move on to the next file. We assume that old files
+ are never modified after a newer file has been written. As a result, Load()
+ can be called multiple times in a row without losing events that have not
+ been yielded yet. In other words, we guarantee that every event will be
+ yielded exactly once.
+
+ Yields:
+ All values that were written to disk that have not been yielded yet.
+ """
+
+ # If the loader exists, check it for a value.
+ if not self._loader:
+ self._InitializeLoader()
+
+ while True:
+ # Yield all the new events in the file we're currently loading from.
+ for event in self._loader.Load():
+ yield event
+
+ next_path = self._GetNextPath()
+ if not next_path:
+ logging.info('No more files in %s', self._directory)
+ # Current file is empty and there are no new files, so we're done.
+ return
+
+ # There's a new file, so check to make sure there weren't any events
+ # written between when we finished reading the current file and when we
+ # checked for the new one. The sequence of events might look something
+ # like this:
+ #
+ # 1. Event #1 written to file #1.
+ # 2. We check for events and yield event #1 from file #1
+ # 3. We check for events and see that there are no more events in file #1.
+ # 4. Event #2 is written to file #1.
+ # 5. Event #3 is written to file #2.
+ # 6. We check for a new file and see that file #2 exists.
+ #
+ # Without this loop, we would miss event #2. We're also guaranteed by the
+ # loader contract that no more events will be written to file #1 after
+ # events start being written to file #2, so we don't have to worry about
+ # that.
+ for event in self._loader.Load():
+ yield event
+
+ logging.info('Directory watcher for %s advancing to file %s',
+ self._directory, next_path)
+
+ # Advance to the next file and start over.
+ self._SetPath(next_path)
+
+ def _InitializeLoader(self):
+ path = self._GetNextPath()
+ if path:
+ self._SetPath(path)
+ else:
+ raise StopIteration
+
+ def _SetPath(self, path):
+ self._path = path
+ self._loader = self._loader_factory(path)
+
+ def _GetNextPath(self):
+ """Returns the path of the next file to use or None if no file exists."""
+ sorted_paths = [os.path.join(self._directory, path)
+ for path in sorted(gfile.ListDirectory(self._directory))]
+ # We filter here so the filter gets the full directory name.
+ filtered_paths = (path for path in sorted_paths
+ if self._path_filter(path) and path > self._path)
+ return next(filtered_paths, None)