diff options
Diffstat (limited to 'tensorflow/python/summary/impl/directory_watcher.py')
-rw-r--r-- | tensorflow/python/summary/impl/directory_watcher.py | 115 |
1 files changed, 115 insertions, 0 deletions
diff --git a/tensorflow/python/summary/impl/directory_watcher.py b/tensorflow/python/summary/impl/directory_watcher.py new file mode 100644 index 0000000000..830e538cb6 --- /dev/null +++ b/tensorflow/python/summary/impl/directory_watcher.py @@ -0,0 +1,115 @@ +"""Contains the implementation for the DirectoryWatcher class.""" +import os + +from tensorflow.python.platform import gfile +from tensorflow.python.platform import logging + + +class DirectoryWatcher(object): + """A DirectoryWatcher wraps a loader to load from a directory. + + A loader reads a file on disk and produces some kind of values as an + iterator. A DirectoryWatcher takes a directory with one file at a time being + written to and a factory for loaders and watches all the files at once. + + This class is *only* valid under the assumption that files are never removed + and the only file ever changed is whichever one is lexicographically last. + """ + + def __init__(self, directory, loader_factory, path_filter=lambda x: True): + """Constructs a new DirectoryWatcher. + + Args: + directory: The directory to watch. The directory doesn't have to exist. + loader_factory: A factory for creating loaders. The factory should take a + file path and return an object that has a Load method returning an + iterator that will yield all events that have not been yielded yet. + path_filter: Only files whose full path matches this predicate will be + loaded. If not specified, all files are loaded. + + Raises: + ValueError: If directory or loader_factory is None. + """ + if directory is None: + raise ValueError('A directory is required') + if loader_factory is None: + raise ValueError('A loader factory is required') + self._directory = directory + self._loader_factory = loader_factory + self._loader = None + self._path = None + self._path_filter = path_filter + + def Load(self): + """Loads new values from disk. + + The watcher will load from one file at a time; as soon as that file stops + yielding events, it will move on to the next file. We assume that old files + are never modified after a newer file has been written. As a result, Load() + can be called multiple times in a row without losing events that have not + been yielded yet. In other words, we guarantee that every event will be + yielded exactly once. + + Yields: + All values that were written to disk that have not been yielded yet. + """ + + # If the loader exists, check it for a value. + if not self._loader: + self._InitializeLoader() + + while True: + # Yield all the new events in the file we're currently loading from. + for event in self._loader.Load(): + yield event + + next_path = self._GetNextPath() + if not next_path: + logging.info('No more files in %s', self._directory) + # Current file is empty and there are no new files, so we're done. + return + + # There's a new file, so check to make sure there weren't any events + # written between when we finished reading the current file and when we + # checked for the new one. The sequence of events might look something + # like this: + # + # 1. Event #1 written to file #1. + # 2. We check for events and yield event #1 from file #1 + # 3. We check for events and see that there are no more events in file #1. + # 4. Event #2 is written to file #1. + # 5. Event #3 is written to file #2. + # 6. We check for a new file and see that file #2 exists. + # + # Without this loop, we would miss event #2. We're also guaranteed by the + # loader contract that no more events will be written to file #1 after + # events start being written to file #2, so we don't have to worry about + # that. + for event in self._loader.Load(): + yield event + + logging.info('Directory watcher for %s advancing to file %s', + self._directory, next_path) + + # Advance to the next file and start over. + self._SetPath(next_path) + + def _InitializeLoader(self): + path = self._GetNextPath() + if path: + self._SetPath(path) + else: + raise StopIteration + + def _SetPath(self, path): + self._path = path + self._loader = self._loader_factory(path) + + def _GetNextPath(self): + """Returns the path of the next file to use or None if no file exists.""" + sorted_paths = [os.path.join(self._directory, path) + for path in sorted(gfile.ListDirectory(self._directory))] + # We filter here so the filter gets the full directory name. + filtered_paths = (path for path in sorted_paths + if self._path_filter(path) and path > self._path) + return next(filtered_paths, None) |