diff options
Diffstat (limited to 'tensorflow/core/util/events_writer.cc')
-rw-r--r-- | tensorflow/core/util/events_writer.cc | 144 |
1 files changed, 144 insertions, 0 deletions
diff --git a/tensorflow/core/util/events_writer.cc b/tensorflow/core/util/events_writer.cc new file mode 100644 index 0000000000..1b34a36577 --- /dev/null +++ b/tensorflow/core/util/events_writer.cc @@ -0,0 +1,144 @@ +#include "tensorflow/core/util/events_writer.h" + +#include <stddef.h> // for NULL + +#include "tensorflow/core/lib/io/path.h" +#include "tensorflow/core/lib/strings/strcat.h" +#include "tensorflow/core/lib/strings/stringprintf.h" +#include "tensorflow/core/platform/logging.h" +#include "tensorflow/core/platform/port.h" +#include "tensorflow/core/public/env.h" +#include "tensorflow/core/public/status.h" +#include "tensorflow/core/util/event.pb.h" + +namespace tensorflow { + +EventsWriter::EventsWriter(const string& file_prefix) + // TODO(jeff,sanjay): Pass in env and use that here instead of Env::Default + : env_(Env::Default()), + file_prefix_(file_prefix), + num_outstanding_events_(0) {} + +bool EventsWriter::Init() { + if (recordio_writer_.get() != nullptr) { + CHECK(!filename_.empty()); + if (FileHasDisappeared()) { + // Warn user of data loss and let .reset() below do basic cleanup. + if (num_outstanding_events_ > 0) { + LOG(WARNING) << "Re-intialization, attempting to open a new file, " + << num_outstanding_events_ << " events will be lost."; + } + } else { + // No-op: File is present and writer is initialized. + return true; + } + } + + int64 time_in_seconds = env_->NowMicros() / 1000000; + + filename_ = strings::Printf( + "%s.out.tfevents.%010lld.%s", file_prefix_.c_str(), + static_cast<long long>(time_in_seconds), port::Hostname().c_str()); + port::AdjustFilenameForLogging(&filename_); + + WritableFile* file; + Status s = env_->NewWritableFile(filename_, &file); + if (!s.ok()) { + LOG(ERROR) << "Could not open events file: " << filename_ << ": " << s; + return false; + } + recordio_file_.reset(file); + recordio_writer_.reset(new io::RecordWriter(recordio_file_.get())); + if (recordio_writer_.get() == NULL) { + LOG(ERROR) << "Could not create record writer"; + return false; + } + num_outstanding_events_ = 0; + VLOG(1) << "Successfully opened events file: " << filename_; + { + // Write the first event with the current version, and flush + // right away so the file contents will be easily determined. + + Event event; + event.set_wall_time(time_in_seconds); + event.set_file_version(strings::StrCat(kVersionPrefix, kCurrentVersion)); + WriteEvent(event); + Flush(); + } + return true; +} + +string EventsWriter::FileName() { + if (filename_.empty()) { + Init(); + } + return filename_; +} + +void EventsWriter::WriteSerializedEvent(const string& event_str) { + if (recordio_writer_.get() == NULL) { + if (!Init()) { + LOG(ERROR) << "Write failed because file could not be opened."; + return; + } + } + num_outstanding_events_++; + recordio_writer_->WriteRecord(event_str); +} + +void EventsWriter::WriteEvent(const Event& event) { + string record; + event.AppendToString(&record); + WriteSerializedEvent(record); +} + +bool EventsWriter::Flush() { + if (num_outstanding_events_ == 0) return true; + CHECK(recordio_file_.get() != NULL) << "Unexpected NULL file"; + // The FileHasDisappeared() condition is necessary because + // recordio_writer_->Sync() can return true even if the underlying + // file has been deleted. EventWriter.FileDeletionBeforeWriting + // demonstrates this and will fail if the FileHasDisappeared() + // conditon is removed. + // Also, we deliberately attempt to Sync() before checking for a + // disappearing file, in case for some file system File::Exists() is + // false after File::Open() but before File::Sync(). + if (!recordio_file_->Flush().ok() || !recordio_file_->Sync().ok() || + FileHasDisappeared()) { + LOG(ERROR) << "Failed to flush " << num_outstanding_events_ << " events to " + << filename_; + return false; + } + VLOG(1) << "Wrote " << num_outstanding_events_ << " events to disk."; + num_outstanding_events_ = 0; + return true; +} + +bool EventsWriter::Close() { + bool return_value = Flush(); + if (recordio_file_.get() != NULL) { + Status s = recordio_file_->Close(); + if (!s.ok()) { + LOG(ERROR) << "Error when closing previous event file: " << filename_ + << ": " << s; + return_value = false; + } + recordio_writer_.reset(NULL); + recordio_file_.reset(NULL); + } + num_outstanding_events_ = 0; + return return_value; +} + +bool EventsWriter::FileHasDisappeared() { + if (env_->FileExists(filename_)) { + return false; + } else { + // This can happen even with non-null recordio_writer_ if some other + // process has removed the file. + LOG(ERROR) << "The events file " << filename_ << " has disappeared."; + return true; + } +} + +} // namespace tensorflow |