aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/python/summary
diff options
context:
space:
mode:
authorGravatar Patrick Nguyen <drpng@google.com>2017-04-17 20:41:44 -0800
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2017-04-17 22:15:14 -0700
commit69a4cf80a129af3fe46b6ff9c509be442d5a06f9 (patch)
tree518cff65134008841e91018be821e843500dabbd /tensorflow/python/summary
parentcca1b71352d246fc292d6e6b9cda63810c659c83 (diff)
Merge changes from github.
Change: 153426348
Diffstat (limited to 'tensorflow/python/summary')
-rw-r--r--tensorflow/python/summary/writer/event_file_writer.py22
-rw-r--r--tensorflow/python/summary/writer/writer_test.py9
2 files changed, 29 insertions, 2 deletions
diff --git a/tensorflow/python/summary/writer/event_file_writer.py b/tensorflow/python/summary/writer/event_file_writer.py
index 8940d9b72e..2936a279bd 100644
--- a/tensorflow/python/summary/writer/event_file_writer.py
+++ b/tensorflow/python/summary/writer/event_file_writer.py
@@ -24,6 +24,7 @@ import time
import six
+from tensorflow.core.util import event_pb2
from tensorflow.python import pywrap_tensorflow
from tensorflow.python.platform import gfile
from tensorflow.python.util import compat
@@ -67,14 +68,20 @@ class EventFileWriter(object):
self._event_queue = six.moves.queue.Queue(max_queue)
self._ev_writer = pywrap_tensorflow.EventsWriter(
compat.as_bytes(os.path.join(self._logdir, "events")))
+ self._flush_secs = flush_secs
+ self._sentinel_event = self._get_sentinel_event()
if filename_suffix:
self._ev_writer.InitWithSuffix(compat.as_bytes(filename_suffix))
self._closed = False
self._worker = _EventLoggerThread(self._event_queue, self._ev_writer,
- flush_secs)
+ self._flush_secs, self._sentinel_event)
self._worker.start()
+ def _get_sentinel_event(self):
+ """Generate a sentinel event for terminating worker."""
+ return event_pb2.Event()
+
def get_logdir(self):
"""Returns the directory where event file will be written."""
return self._logdir
@@ -88,6 +95,9 @@ class EventFileWriter(object):
Does nothing if the EventFileWriter was not closed.
"""
if self._closed:
+ self._worker = _EventLoggerThread(self._event_queue, self._ev_writer,
+ self._flush_secs, self._sentinel_event)
+ self._worker.start()
self._closed = False
def add_event(self, event):
@@ -113,7 +123,9 @@ class EventFileWriter(object):
Call this method when you do not need the summary writer anymore.
"""
+ self.add_event(self._sentinel_event)
self.flush()
+ self._worker.join()
self._ev_writer.Close()
self._closed = True
@@ -121,7 +133,7 @@ class EventFileWriter(object):
class _EventLoggerThread(threading.Thread):
"""Thread that logs events."""
- def __init__(self, queue, ev_writer, flush_secs):
+ def __init__(self, queue, ev_writer, flush_secs, sentinel_event):
"""Creates an _EventLoggerThread.
Args:
@@ -130,6 +142,8 @@ class _EventLoggerThread(threading.Thread):
the visualizer.
flush_secs: How often, in seconds, to flush the
pending file to disk.
+ sentinel_event: A sentinel element in queue that tells this thread to
+ terminate.
"""
threading.Thread.__init__(self)
self.daemon = True
@@ -138,10 +152,14 @@ class _EventLoggerThread(threading.Thread):
self._flush_secs = flush_secs
# The first event will be flushed immediately.
self._next_event_flush_time = 0
+ self._sentinel_event = sentinel_event
def run(self):
while True:
event = self._queue.get()
+ if event is self._sentinel_event:
+ self._queue.task_done()
+ break
try:
self._ev_writer.WriteEvent(event)
# Flush the event writer every so often.
diff --git a/tensorflow/python/summary/writer/writer_test.py b/tensorflow/python/summary/writer/writer_test.py
index b31c41d112..8c34eb82e3 100644
--- a/tensorflow/python/summary/writer/writer_test.py
+++ b/tensorflow/python/summary/writer/writer_test.py
@@ -258,6 +258,15 @@ class SummaryWriterTestCase(test.TestCase):
# We should be done.
self.assertRaises(StopIteration, lambda: next(rr))
+ def testNonBlockingClose(self):
+ test_dir = self._CleanTestDir("non_blocking_close")
+ sw = writer.FileWriter(test_dir)
+ # Sleep 1.2 seconds to make sure event queue is empty.
+ time.sleep(1.2)
+ time_before_close = time.time()
+ sw.close()
+ self._assertRecent(time_before_close)
+
# Checks that values returned from session Run() calls are added correctly to
# summaries. These are numpy types so we need to check they fit in the
# protocol buffers correctly.