From e6e8d8715552d8890c0dd10f49ec3dff931a9926 Mon Sep 17 00:00:00 2001 From: Jonathan Hseu Date: Thu, 17 Nov 2016 13:59:10 -0800 Subject: HDFS: Reopen the file when reading if we reach EOF. - HDFS requires readers to reopen the file to get updated contents when reading. See the context at #5438 - Also switched to HFlush(), which is the non-deprecated Flush() call. - Fixes #5438 Change: 139503549 --- .../core/platform/hadoop/hadoop_file_system.cc | 56 +++++++++++++++++----- .../platform/hadoop/hadoop_file_system_test.cc | 38 +++++++++++++++ 2 files changed, 83 insertions(+), 11 deletions(-) diff --git a/tensorflow/core/platform/hadoop/hadoop_file_system.cc b/tensorflow/core/platform/hadoop/hadoop_file_system.cc index fd9aa3f685..3de3b17517 100644 --- a/tensorflow/core/platform/hadoop/hadoop_file_system.cc +++ b/tensorflow/core/platform/hadoop/hadoop_file_system.cc @@ -23,6 +23,7 @@ limitations under the License. #include "tensorflow/core/platform/env.h" #include "tensorflow/core/platform/file_system.h" #include "tensorflow/core/platform/logging.h" +#include "tensorflow/core/platform/mutex.h" #include "tensorflow/core/platform/posix/error.h" #include "third_party/hadoop/hdfs.h" @@ -61,7 +62,7 @@ class LibHDFS { std::function hdfsCloseFile; std::function hdfsPread; std::function hdfsWrite; - std::function hdfsFlush; + std::function hdfsHFlush; std::function hdfsHSync; std::function hdfsOpenFile; @@ -87,7 +88,7 @@ class LibHDFS { BIND_HDFS_FUNC(hdfsCloseFile); BIND_HDFS_FUNC(hdfsPread); BIND_HDFS_FUNC(hdfsWrite); - BIND_HDFS_FUNC(hdfsFlush); + BIND_HDFS_FUNC(hdfsHFlush); BIND_HDFS_FUNC(hdfsHSync); BIND_HDFS_FUNC(hdfsOpenFile); BIND_HDFS_FUNC(hdfsExists); @@ -157,24 +158,53 @@ string HadoopFileSystem::TranslateName(const string& name) const { class HDFSRandomAccessFile : public RandomAccessFile { public: - HDFSRandomAccessFile(const string& fname, LibHDFS* hdfs, hdfsFS fs, - hdfsFile file) - : filename_(fname), hdfs_(hdfs), fs_(fs), file_(file) {} - - ~HDFSRandomAccessFile() override { hdfs_->hdfsCloseFile(fs_, file_); } + HDFSRandomAccessFile(const string& filename, const string& hdfs_filename, + LibHDFS* hdfs, hdfsFS fs, hdfsFile file) + : filename_(filename), + hdfs_filename_(hdfs_filename), + hdfs_(hdfs), + fs_(fs), + file_(file) {} + + ~HDFSRandomAccessFile() override { + if (file_ != nullptr) { + mutex_lock lock(mu_); + hdfs_->hdfsCloseFile(fs_, file_); + } + } Status Read(uint64 offset, size_t n, StringPiece* result, char* scratch) const override { Status s; char* dst = scratch; + bool eof_retried = false; while (n > 0 && s.ok()) { + // We lock inside the loop rather than outside so we don't block other + // concurrent readers. + mutex_lock lock(mu_); tSize r = hdfs_->hdfsPread(fs_, file_, static_cast(offset), dst, static_cast(n)); if (r > 0) { dst += r; n -= r; offset += r; - } else if (r == 0) { + } else if (!eof_retried && r == 0) { + // Always reopen the file upon reaching EOF to see if there's more data. + // If writers are streaming contents while others are concurrently + // reading, HDFS requires that we reopen the file to see updated + // contents. + // + // Fixes #5438 + if (file_ != nullptr && hdfs_->hdfsCloseFile(fs_, file_) != 0) { + return IOError(filename_, errno); + } + file_ = + hdfs_->hdfsOpenFile(fs_, hdfs_filename_.c_str(), O_RDONLY, 0, 0, 0); + if (file_ == nullptr) { + return IOError(filename_, errno); + } + eof_retried = true; + } else if (eof_retried && r == 0) { s = Status(error::OUT_OF_RANGE, "Read less bytes than requested"); } else if (errno == EINTR || errno == EAGAIN) { // hdfsPread may return EINTR too. Just retry. @@ -188,9 +218,12 @@ class HDFSRandomAccessFile : public RandomAccessFile { private: string filename_; + string hdfs_filename_; LibHDFS* hdfs_; hdfsFS fs_; - hdfsFile file_; + + mutable mutex mu_; + mutable hdfsFile file_ GUARDED_BY(mu_); }; Status HadoopFileSystem::NewRandomAccessFile( @@ -203,7 +236,8 @@ Status HadoopFileSystem::NewRandomAccessFile( if (file == nullptr) { return IOError(fname, errno); } - result->reset(new HDFSRandomAccessFile(fname, hdfs_, fs, file)); + result->reset( + new HDFSRandomAccessFile(fname, TranslateName(fname), hdfs_, fs, file)); return Status::OK(); } @@ -238,7 +272,7 @@ class HDFSWritableFile : public WritableFile { } Status Flush() override { - if (hdfs_->hdfsFlush(fs_, file_) != 0) { + if (hdfs_->hdfsHFlush(fs_, file_) != 0) { return IOError(filename_, errno); } return Status::OK(); diff --git a/tensorflow/core/platform/hadoop/hadoop_file_system_test.cc b/tensorflow/core/platform/hadoop/hadoop_file_system_test.cc index 59e1d23645..6ba2f04d0f 100644 --- a/tensorflow/core/platform/hadoop/hadoop_file_system_test.cc +++ b/tensorflow/core/platform/hadoop/hadoop_file_system_test.cc @@ -191,6 +191,44 @@ TEST_F(HadoopFileSystemTest, StatFile) { EXPECT_FALSE(stat.is_directory); } +TEST_F(HadoopFileSystemTest, WriteWhileReading) { + std::unique_ptr writer; + const string fname = TmpDir("WriteWhileReading"); + // Skip the test if we're not testing on HDFS. Hadoop's local filesystem + // implementation makes no guarantees that writable files are readable while + // being written. + if (!StringPiece(fname).starts_with("hdfs://")) { + return; + } + + TF_EXPECT_OK(hdfs.NewWritableFile(fname, &writer)); + + const string content1 = "content1"; + TF_EXPECT_OK(writer->Append(content1)); + TF_EXPECT_OK(writer->Flush()); + + std::unique_ptr reader; + TF_EXPECT_OK(hdfs.NewRandomAccessFile(fname, &reader)); + + string got; + got.resize(content1.size()); + StringPiece result; + TF_EXPECT_OK( + reader->Read(0, content1.size(), &result, gtl::string_as_array(&got))); + EXPECT_EQ(content1, result); + + string content2 = "content2"; + TF_EXPECT_OK(writer->Append(content2)); + TF_EXPECT_OK(writer->Flush()); + + got.resize(content2.size()); + TF_EXPECT_OK(reader->Read(content1.size(), content2.size(), &result, + gtl::string_as_array(&got))); + EXPECT_EQ(content2, result); + + TF_EXPECT_OK(writer->Close()); +} + // NewAppendableFile() is not testable. Local filesystem maps to // ChecksumFileSystem in Hadoop, where appending is an unsupported operation. -- cgit v1.2.3