diff options
author | Patrick Nguyen <drpng@google.com> | 2017-04-17 20:41:44 -0800 |
---|---|---|
committer | TensorFlower Gardener <gardener@tensorflow.org> | 2017-04-17 22:15:14 -0700 |
commit | 69a4cf80a129af3fe46b6ff9c509be442d5a06f9 (patch) | |
tree | 518cff65134008841e91018be821e843500dabbd /tensorflow/python/summary | |
parent | cca1b71352d246fc292d6e6b9cda63810c659c83 (diff) |
Merge changes from github.
Change: 153426348
Diffstat (limited to 'tensorflow/python/summary')
-rw-r--r-- | tensorflow/python/summary/writer/event_file_writer.py | 22 | ||||
-rw-r--r-- | tensorflow/python/summary/writer/writer_test.py | 9 |
2 files changed, 29 insertions, 2 deletions
diff --git a/tensorflow/python/summary/writer/event_file_writer.py b/tensorflow/python/summary/writer/event_file_writer.py index 8940d9b72e..2936a279bd 100644 --- a/tensorflow/python/summary/writer/event_file_writer.py +++ b/tensorflow/python/summary/writer/event_file_writer.py @@ -24,6 +24,7 @@ import time import six +from tensorflow.core.util import event_pb2 from tensorflow.python import pywrap_tensorflow from tensorflow.python.platform import gfile from tensorflow.python.util import compat @@ -67,14 +68,20 @@ class EventFileWriter(object): self._event_queue = six.moves.queue.Queue(max_queue) self._ev_writer = pywrap_tensorflow.EventsWriter( compat.as_bytes(os.path.join(self._logdir, "events"))) + self._flush_secs = flush_secs + self._sentinel_event = self._get_sentinel_event() if filename_suffix: self._ev_writer.InitWithSuffix(compat.as_bytes(filename_suffix)) self._closed = False self._worker = _EventLoggerThread(self._event_queue, self._ev_writer, - flush_secs) + self._flush_secs, self._sentinel_event) self._worker.start() + def _get_sentinel_event(self): + """Generate a sentinel event for terminating worker.""" + return event_pb2.Event() + def get_logdir(self): """Returns the directory where event file will be written.""" return self._logdir @@ -88,6 +95,9 @@ class EventFileWriter(object): Does nothing if the EventFileWriter was not closed. """ if self._closed: + self._worker = _EventLoggerThread(self._event_queue, self._ev_writer, + self._flush_secs, self._sentinel_event) + self._worker.start() self._closed = False def add_event(self, event): @@ -113,7 +123,9 @@ class EventFileWriter(object): Call this method when you do not need the summary writer anymore. """ + self.add_event(self._sentinel_event) self.flush() + self._worker.join() self._ev_writer.Close() self._closed = True @@ -121,7 +133,7 @@ class EventFileWriter(object): class _EventLoggerThread(threading.Thread): """Thread that logs events.""" - def __init__(self, queue, ev_writer, flush_secs): + def __init__(self, queue, ev_writer, flush_secs, sentinel_event): """Creates an _EventLoggerThread. Args: @@ -130,6 +142,8 @@ class _EventLoggerThread(threading.Thread): the visualizer. flush_secs: How often, in seconds, to flush the pending file to disk. + sentinel_event: A sentinel element in queue that tells this thread to + terminate. """ threading.Thread.__init__(self) self.daemon = True @@ -138,10 +152,14 @@ class _EventLoggerThread(threading.Thread): self._flush_secs = flush_secs # The first event will be flushed immediately. self._next_event_flush_time = 0 + self._sentinel_event = sentinel_event def run(self): while True: event = self._queue.get() + if event is self._sentinel_event: + self._queue.task_done() + break try: self._ev_writer.WriteEvent(event) # Flush the event writer every so often. diff --git a/tensorflow/python/summary/writer/writer_test.py b/tensorflow/python/summary/writer/writer_test.py index b31c41d112..8c34eb82e3 100644 --- a/tensorflow/python/summary/writer/writer_test.py +++ b/tensorflow/python/summary/writer/writer_test.py @@ -258,6 +258,15 @@ class SummaryWriterTestCase(test.TestCase): # We should be done. self.assertRaises(StopIteration, lambda: next(rr)) + def testNonBlockingClose(self): + test_dir = self._CleanTestDir("non_blocking_close") + sw = writer.FileWriter(test_dir) + # Sleep 1.2 seconds to make sure event queue is empty. + time.sleep(1.2) + time_before_close = time.time() + sw.close() + self._assertRecent(time_before_close) + # Checks that values returned from session Run() calls are added correctly to # summaries. These are numpy types so we need to check they fit in the # protocol buffers correctly. |