aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Jonathan Hseu <jhseu@google.com>2016-11-17 13:59:10 -0800
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2016-11-17 14:05:10 -0800
commite6e8d8715552d8890c0dd10f49ec3dff931a9926 (patch)
treeaf66d3f59ed8012457093a9c090136fde3c2a820
parent5329ac7a4c8c20df5e22c928a82ccc943dc9100c (diff)
HDFS: Reopen the file when reading if we reach EOF.
- HDFS requires readers to reopen the file to get updated contents when reading. See the context at #5438 - Also switched to HFlush(), which is the non-deprecated Flush() call. - Fixes #5438 Change: 139503549
-rw-r--r--tensorflow/core/platform/hadoop/hadoop_file_system.cc56
-rw-r--r--tensorflow/core/platform/hadoop/hadoop_file_system_test.cc38
2 files changed, 83 insertions, 11 deletions
diff --git a/tensorflow/core/platform/hadoop/hadoop_file_system.cc b/tensorflow/core/platform/hadoop/hadoop_file_system.cc
index fd9aa3f685..3de3b17517 100644
--- a/tensorflow/core/platform/hadoop/hadoop_file_system.cc
+++ b/tensorflow/core/platform/hadoop/hadoop_file_system.cc
@@ -23,6 +23,7 @@ limitations under the License.
#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/file_system.h"
#include "tensorflow/core/platform/logging.h"
+#include "tensorflow/core/platform/mutex.h"
#include "tensorflow/core/platform/posix/error.h"
#include "third_party/hadoop/hdfs.h"
@@ -61,7 +62,7 @@ class LibHDFS {
std::function<int(hdfsFS, hdfsFile)> hdfsCloseFile;
std::function<tSize(hdfsFS, hdfsFile, tOffset, void*, tSize)> hdfsPread;
std::function<tSize(hdfsFS, hdfsFile, const void*, tSize)> hdfsWrite;
- std::function<int(hdfsFS, hdfsFile)> hdfsFlush;
+ std::function<int(hdfsFS, hdfsFile)> hdfsHFlush;
std::function<int(hdfsFS, hdfsFile)> hdfsHSync;
std::function<hdfsFile(hdfsFS, const char*, int, int, short, tSize)>
hdfsOpenFile;
@@ -87,7 +88,7 @@ class LibHDFS {
BIND_HDFS_FUNC(hdfsCloseFile);
BIND_HDFS_FUNC(hdfsPread);
BIND_HDFS_FUNC(hdfsWrite);
- BIND_HDFS_FUNC(hdfsFlush);
+ BIND_HDFS_FUNC(hdfsHFlush);
BIND_HDFS_FUNC(hdfsHSync);
BIND_HDFS_FUNC(hdfsOpenFile);
BIND_HDFS_FUNC(hdfsExists);
@@ -157,24 +158,53 @@ string HadoopFileSystem::TranslateName(const string& name) const {
class HDFSRandomAccessFile : public RandomAccessFile {
public:
- HDFSRandomAccessFile(const string& fname, LibHDFS* hdfs, hdfsFS fs,
- hdfsFile file)
- : filename_(fname), hdfs_(hdfs), fs_(fs), file_(file) {}
-
- ~HDFSRandomAccessFile() override { hdfs_->hdfsCloseFile(fs_, file_); }
+ HDFSRandomAccessFile(const string& filename, const string& hdfs_filename,
+ LibHDFS* hdfs, hdfsFS fs, hdfsFile file)
+ : filename_(filename),
+ hdfs_filename_(hdfs_filename),
+ hdfs_(hdfs),
+ fs_(fs),
+ file_(file) {}
+
+ ~HDFSRandomAccessFile() override {
+ if (file_ != nullptr) {
+ mutex_lock lock(mu_);
+ hdfs_->hdfsCloseFile(fs_, file_);
+ }
+ }
Status Read(uint64 offset, size_t n, StringPiece* result,
char* scratch) const override {
Status s;
char* dst = scratch;
+ bool eof_retried = false;
while (n > 0 && s.ok()) {
+ // We lock inside the loop rather than outside so we don't block other
+ // concurrent readers.
+ mutex_lock lock(mu_);
tSize r = hdfs_->hdfsPread(fs_, file_, static_cast<tOffset>(offset), dst,
static_cast<tSize>(n));
if (r > 0) {
dst += r;
n -= r;
offset += r;
- } else if (r == 0) {
+ } else if (!eof_retried && r == 0) {
+ // Always reopen the file upon reaching EOF to see if there's more data.
+ // If writers are streaming contents while others are concurrently
+ // reading, HDFS requires that we reopen the file to see updated
+ // contents.
+ //
+ // Fixes #5438
+ if (file_ != nullptr && hdfs_->hdfsCloseFile(fs_, file_) != 0) {
+ return IOError(filename_, errno);
+ }
+ file_ =
+ hdfs_->hdfsOpenFile(fs_, hdfs_filename_.c_str(), O_RDONLY, 0, 0, 0);
+ if (file_ == nullptr) {
+ return IOError(filename_, errno);
+ }
+ eof_retried = true;
+ } else if (eof_retried && r == 0) {
s = Status(error::OUT_OF_RANGE, "Read less bytes than requested");
} else if (errno == EINTR || errno == EAGAIN) {
// hdfsPread may return EINTR too. Just retry.
@@ -188,9 +218,12 @@ class HDFSRandomAccessFile : public RandomAccessFile {
private:
string filename_;
+ string hdfs_filename_;
LibHDFS* hdfs_;
hdfsFS fs_;
- hdfsFile file_;
+
+ mutable mutex mu_;
+ mutable hdfsFile file_ GUARDED_BY(mu_);
};
Status HadoopFileSystem::NewRandomAccessFile(
@@ -203,7 +236,8 @@ Status HadoopFileSystem::NewRandomAccessFile(
if (file == nullptr) {
return IOError(fname, errno);
}
- result->reset(new HDFSRandomAccessFile(fname, hdfs_, fs, file));
+ result->reset(
+ new HDFSRandomAccessFile(fname, TranslateName(fname), hdfs_, fs, file));
return Status::OK();
}
@@ -238,7 +272,7 @@ class HDFSWritableFile : public WritableFile {
}
Status Flush() override {
- if (hdfs_->hdfsFlush(fs_, file_) != 0) {
+ if (hdfs_->hdfsHFlush(fs_, file_) != 0) {
return IOError(filename_, errno);
}
return Status::OK();
diff --git a/tensorflow/core/platform/hadoop/hadoop_file_system_test.cc b/tensorflow/core/platform/hadoop/hadoop_file_system_test.cc
index 59e1d23645..6ba2f04d0f 100644
--- a/tensorflow/core/platform/hadoop/hadoop_file_system_test.cc
+++ b/tensorflow/core/platform/hadoop/hadoop_file_system_test.cc
@@ -191,6 +191,44 @@ TEST_F(HadoopFileSystemTest, StatFile) {
EXPECT_FALSE(stat.is_directory);
}
+TEST_F(HadoopFileSystemTest, WriteWhileReading) {
+ std::unique_ptr<WritableFile> writer;
+ const string fname = TmpDir("WriteWhileReading");
+ // Skip the test if we're not testing on HDFS. Hadoop's local filesystem
+ // implementation makes no guarantees that writable files are readable while
+ // being written.
+ if (!StringPiece(fname).starts_with("hdfs://")) {
+ return;
+ }
+
+ TF_EXPECT_OK(hdfs.NewWritableFile(fname, &writer));
+
+ const string content1 = "content1";
+ TF_EXPECT_OK(writer->Append(content1));
+ TF_EXPECT_OK(writer->Flush());
+
+ std::unique_ptr<RandomAccessFile> reader;
+ TF_EXPECT_OK(hdfs.NewRandomAccessFile(fname, &reader));
+
+ string got;
+ got.resize(content1.size());
+ StringPiece result;
+ TF_EXPECT_OK(
+ reader->Read(0, content1.size(), &result, gtl::string_as_array(&got)));
+ EXPECT_EQ(content1, result);
+
+ string content2 = "content2";
+ TF_EXPECT_OK(writer->Append(content2));
+ TF_EXPECT_OK(writer->Flush());
+
+ got.resize(content2.size());
+ TF_EXPECT_OK(reader->Read(content1.size(), content2.size(), &result,
+ gtl::string_as_array(&got)));
+ EXPECT_EQ(content2, result);
+
+ TF_EXPECT_OK(writer->Close());
+}
+
// NewAppendableFile() is not testable. Local filesystem maps to
// ChecksumFileSystem in Hadoop, where appending is an unsupported operation.