aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--tensorflow/python/summary/event_accumulator.py6
-rw-r--r--tensorflow/python/summary/impl/directory_watcher.py133
-rw-r--r--tensorflow/python/summary/impl/directory_watcher_test.py12
-rw-r--r--tensorflow/tensorflow.bzl2
4 files changed, 100 insertions, 53 deletions
diff --git a/tensorflow/python/summary/event_accumulator.py b/tensorflow/python/summary/event_accumulator.py
index 3715ca64cb..c605b1b007 100644
--- a/tensorflow/python/summary/event_accumulator.py
+++ b/tensorflow/python/summary/event_accumulator.py
@@ -524,8 +524,10 @@ def _GeneratorFromPath(path):
"""Create an event generator for file or directory at given path string."""
loader_factory = event_file_loader.EventFileLoader
if gfile.IsDirectory(path):
- return directory_watcher.DirectoryWatcher(path, loader_factory,
- IsTensorFlowEventsFile)
+ provider = directory_watcher.SequentialGFileProvider(
+ path,
+ path_filter=IsTensorFlowEventsFile)
+ return directory_watcher.DirectoryWatcher(provider, loader_factory)
else:
return loader_factory(path)
diff --git a/tensorflow/python/summary/impl/directory_watcher.py b/tensorflow/python/summary/impl/directory_watcher.py
index 587c7a6d30..d3ece1f01d 100644
--- a/tensorflow/python/summary/impl/directory_watcher.py
+++ b/tensorflow/python/summary/impl/directory_watcher.py
@@ -25,52 +25,56 @@ from tensorflow.python.platform import logging
class DirectoryWatcher(object):
- """A DirectoryWatcher wraps a loader to load from a directory.
+ """A DirectoryWatcher wraps a loader to load from a sequence of paths.
- A loader reads a file on disk and produces some kind of values as an
- iterator. A DirectoryWatcher takes a directory with one file at a time being
- written to and a factory for loaders and watches all the files at once.
+ A loader reads a path and produces some kind of values as an iterator. A
+ DirectoryWatcher takes a directory, a path provider (see below) to call to
+ find new paths to load from, and a factory for loaders and watches all the
+ paths inside that directory.
+
+ A path provider is a function that, given either a path or None, returns the
+ next path to load from (or None if there is no such path). This class is only
+ valid under the assumption that only one path will be written to by the data
+ source at a time, and that the path_provider will return the oldest data
+ source that contains fresh data.
- This class is *only* valid under the assumption that files are never removed
- and the only file ever changed is whichever one is lexicographically last.
"""
- def __init__(self, directory, loader_factory, path_filter=lambda x: True):
+ def __init__(self, path_provider, loader_factory):
"""Constructs a new DirectoryWatcher.
Args:
- directory: The directory to watch. The directory doesn't have to exist.
+ path_provider: The callback to invoke when trying to find a new path to
+ load from. See the class documentation for the semantics of a path
+ provider.
loader_factory: A factory for creating loaders. The factory should take a
- file path and return an object that has a Load method returning an
+ path and return an object that has a Load method returning an
iterator that will yield all events that have not been yielded yet.
- path_filter: Only files whose full path matches this predicate will be
- loaded. If not specified, all files are loaded.
Raises:
- ValueError: If directory or loader_factory is None.
+ ValueError: If path_provider or loader_factory are None.
"""
- if directory is None:
- raise ValueError('A directory is required')
+ if path_provider is None:
+ raise ValueError('A path provider is required')
if loader_factory is None:
raise ValueError('A loader factory is required')
- self._directory = directory
+ self._path_provider = path_provider
+ self._path = None
self._loader_factory = loader_factory
self._loader = None
- self._path = ''
- self._path_filter = path_filter
def Load(self):
- """Loads new values from disk.
+ """Loads new values.
- The watcher will load from one file at a time; as soon as that file stops
- yielding events, it will move on to the next file. We assume that old files
- are never modified after a newer file has been written. As a result, Load()
+ The watcher will load from one path at a time; as soon as that path stops
+ yielding events, it will move on to the next path. We assume that old paths
+ are never modified after a newer path has been written. As a result, Load()
can be called multiple times in a row without losing events that have not
been yielded yet. In other words, we guarantee that every event will be
yielded exactly once.
Yields:
- All values that were written to disk that have not been yielded yet.
+ All values that have not been yielded yet.
"""
# If the loader exists, check it for a value.
@@ -78,39 +82,39 @@ class DirectoryWatcher(object):
self._InitializeLoader()
while True:
- # Yield all the new events in the file we're currently loading from.
+ # Yield all the new events in the path we're currently loading from.
for event in self._loader.Load():
yield event
next_path = self._GetNextPath()
if not next_path:
- logging.info('No more files in %s', self._directory)
- # Current file is empty and there are no new files, so we're done.
+ logging.info('No path found after %s', self._path)
+ # Current path is empty and there are no new paths, so we're done.
return
- # There's a new file, so check to make sure there weren't any events
- # written between when we finished reading the current file and when we
+ # There's a new path, so check to make sure there weren't any events
+ # written between when we finished reading the current path and when we
# checked for the new one. The sequence of events might look something
# like this:
#
- # 1. Event #1 written to file #1.
- # 2. We check for events and yield event #1 from file #1
- # 3. We check for events and see that there are no more events in file #1.
- # 4. Event #2 is written to file #1.
- # 5. Event #3 is written to file #2.
- # 6. We check for a new file and see that file #2 exists.
+ # 1. Event #1 written to path #1.
+ # 2. We check for events and yield event #1 from path #1
+ # 3. We check for events and see that there are no more events in path #1.
+ # 4. Event #2 is written to path #1.
+ # 5. Event #3 is written to path #2.
+ # 6. We check for a new path and see that path #2 exists.
#
# Without this loop, we would miss event #2. We're also guaranteed by the
- # loader contract that no more events will be written to file #1 after
- # events start being written to file #2, so we don't have to worry about
+ # loader contract that no more events will be written to path #1 after
+ # events start being written to path #2, so we don't have to worry about
# that.
for event in self._loader.Load():
yield event
- logging.info('Directory watcher for %s advancing to file %s',
- self._directory, next_path)
+ logging.info('Directory watcher advancing from %s to %s', self._path,
+ next_path)
- # Advance to the next file and start over.
+ # Advance to the next path and start over.
self._SetPath(next_path)
def _InitializeLoader(self):
@@ -125,10 +129,49 @@ class DirectoryWatcher(object):
self._loader = self._loader_factory(path)
def _GetNextPath(self):
- """Returns the path of the next file to use or None if no file exists."""
- sorted_paths = [os.path.join(self._directory, path)
- for path in sorted(gfile.ListDirectory(self._directory))]
- # We filter here so the filter gets the full directory name.
- filtered_paths = (path for path in sorted_paths
- if self._path_filter(path) and path > self._path)
- return next(filtered_paths, None)
+ """Returns the next path to use or None if no such path exists."""
+ return self._path_provider(self._path)
+
+
+def _SequentialProvider(path_source):
+ """A provider that iterates over the output of a function that produces paths.
+
+ _SequentialProvider takes in a path_source, which is a function that returns a
+ list of all currently available paths. _SequentialProvider returns in a path
+ provider (see documentation for the |DirectoryWatcher| class for the
+ semantics) that will return the alphabetically next path after the current one
+ (or the earliest path if the current path is None).
+
+ The provider will never return a path which is alphanumerically less than the
+ current path; as such, if the path source provides a high path (e.g. "c") and
+ later doubles back and provides a low path (e.g. "b"), once the current path
+ was set to "c" the _SequentialProvider will ignore the "b" and never return
+ it.
+
+ Args:
+ path_source: A function that returns an iterable of paths.
+
+ Returns:
+ A path provider for use with DirectoryWatcher.
+
+ """
+ def _Provider(current_path):
+ next_paths = list(path
+ for path in path_source()
+ if current_path is None or path > current_path)
+ if next_paths:
+ return min(next_paths)
+ else:
+ return None
+
+ return _Provider
+
+
+def SequentialGFileProvider(directory, path_filter=lambda x: True):
+ """Provides the files in a directory that match the given filter."""
+ def _Source():
+ paths = (os.path.join(directory, path)
+ for path in gfile.ListDirectory(directory))
+ return (path for path in paths if path_filter(path))
+
+ return _SequentialProvider(_Source)
diff --git a/tensorflow/python/summary/impl/directory_watcher_test.py b/tensorflow/python/summary/impl/directory_watcher_test.py
index f020c4fd03..784d585dfa 100644
--- a/tensorflow/python/summary/impl/directory_watcher_test.py
+++ b/tensorflow/python/summary/impl/directory_watcher_test.py
@@ -52,7 +52,7 @@ class DirectoryWatcherTest(test_util.TensorFlowTestCase):
self._directory = os.path.join(self.get_temp_dir(), 'monitor_dir')
os.mkdir(self._directory)
self._watcher = directory_watcher.DirectoryWatcher(
- self._directory, _ByteLoader)
+ directory_watcher.SequentialGFileProvider(self._directory), _ByteLoader)
def tearDown(self):
shutil.rmtree(self._directory)
@@ -69,7 +69,7 @@ class DirectoryWatcherTest(test_util.TensorFlowTestCase):
with self.assertRaises(ValueError):
directory_watcher.DirectoryWatcher(None, lambda x: [])
with self.assertRaises(ValueError):
- directory_watcher.DirectoryWatcher('asdf', None)
+ directory_watcher.DirectoryWatcher(lambda x: None, None)
def testEmptyDirectory(self):
self.assertWatcherYields([])
@@ -110,15 +110,17 @@ class DirectoryWatcherTest(test_util.TensorFlowTestCase):
self._WriteToFile('c', 'c')
self.assertWatcherYields(['a', 'c'])
- def testFileFilter(self):
- self._watcher = directory_watcher.DirectoryWatcher(
- self._directory, _ByteLoader,
+ def testPathFilter(self):
+ provider = directory_watcher.SequentialGFileProvider(
+ self._directory,
path_filter=lambda path: 'do_not_watch_me' not in path)
+ self._watcher = directory_watcher.DirectoryWatcher(provider, _ByteLoader)
self._WriteToFile('a', 'a')
self._WriteToFile('do_not_watch_me', 'b')
self._WriteToFile('c', 'c')
self.assertWatcherYields(['a', 'c'])
+
if __name__ == '__main__':
googletest.main()
diff --git a/tensorflow/tensorflow.bzl b/tensorflow/tensorflow.bzl
index 4c1af94b0d..22bc61ec68 100644
--- a/tensorflow/tensorflow.bzl
+++ b/tensorflow/tensorflow.bzl
@@ -9,7 +9,7 @@ load("//tensorflow/core:platform/default/build_config_root.bzl",
# List of proto files for android builds
def tf_android_core_proto_sources():
return [
- "//google/protobuf", # any.proto
+ "//google/protobuf:any.proto",
"//tensorflow/core:example/example.proto",
"//tensorflow/core:example/feature.proto",
"//tensorflow/core:framework/allocation_description.proto",