aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar A. Unique TensorFlower <gardener@tensorflow.org>2018-01-04 18:17:49 -0800
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2018-01-04 18:21:42 -0800
commitc9ed9bf846c6c8e8566082ce4ac201a529c23355 (patch)
tree7138a307c9baf54df8799884e8f9bed3a43d7b71
parent4f877d4e54bb2427882f4a800607a1cf0531b293 (diff)
Add a FlushCaches() method to the FileSystem interface, and provide an implementation for GcsFileSystem.
PiperOrigin-RevId: 180873963
-rw-r--r--tensorflow/core/platform/cloud/expiring_lru_cache.h7
-rw-r--r--tensorflow/core/platform/cloud/expiring_lru_cache_test.cc22
-rw-r--r--tensorflow/core/platform/cloud/file_block_cache.cc8
-rw-r--r--tensorflow/core/platform/cloud/file_block_cache.h3
-rw-r--r--tensorflow/core/platform/cloud/file_block_cache_test.cc20
-rw-r--r--tensorflow/core/platform/cloud/gcs_file_system.cc9
-rw-r--r--tensorflow/core/platform/cloud/gcs_file_system.h2
-rw-r--r--tensorflow/core/platform/cloud/gcs_file_system_test.cc131
-rw-r--r--tensorflow/core/platform/file_system.cc2
-rw-r--r--tensorflow/core/platform/file_system.h3
10 files changed, 207 insertions, 0 deletions
diff --git a/tensorflow/core/platform/cloud/expiring_lru_cache.h b/tensorflow/core/platform/cloud/expiring_lru_cache.h
index 3fc23a4306..c738497ddd 100644
--- a/tensorflow/core/platform/cloud/expiring_lru_cache.h
+++ b/tensorflow/core/platform/cloud/expiring_lru_cache.h
@@ -88,6 +88,13 @@ class ExpiringLRUCache {
return s;
}
+ /// Clear the cache.
+ void Clear() {
+ mutex_lock lock(mu_);
+ cache_.clear();
+ lru_list_.clear();
+ }
+
/// Accessors for cache parameters.
uint64 max_age() const { return max_age_; }
size_t max_entries() const { return max_entries_; }
diff --git a/tensorflow/core/platform/cloud/expiring_lru_cache_test.cc b/tensorflow/core/platform/cloud/expiring_lru_cache_test.cc
index 8f8d5744a4..3bc6db3842 100644
--- a/tensorflow/core/platform/cloud/expiring_lru_cache_test.cc
+++ b/tensorflow/core/platform/cloud/expiring_lru_cache_test.cc
@@ -152,5 +152,27 @@ TEST(ExpiringLRUCacheTest, LookupOrCompute) {
EXPECT_EQ(num_compute_calls, 6);
}
+TEST(ExpiringLRUCacheTest, Clear) {
+ ExpiringLRUCache<int> cache(1, 4);
+ cache.Insert("a", 1);
+ cache.Insert("b", 2);
+ cache.Insert("c", 3);
+ cache.Insert("d", 4);
+ int value = 0;
+ EXPECT_TRUE(cache.Lookup("a", &value));
+ EXPECT_EQ(value, 1);
+ EXPECT_TRUE(cache.Lookup("b", &value));
+ EXPECT_EQ(value, 2);
+ EXPECT_TRUE(cache.Lookup("c", &value));
+ EXPECT_EQ(value, 3);
+ EXPECT_TRUE(cache.Lookup("d", &value));
+ EXPECT_EQ(value, 4);
+ cache.Clear();
+ EXPECT_FALSE(cache.Lookup("a", &value));
+ EXPECT_FALSE(cache.Lookup("b", &value));
+ EXPECT_FALSE(cache.Lookup("c", &value));
+ EXPECT_FALSE(cache.Lookup("d", &value));
+}
+
} // namespace
} // namespace tensorflow
diff --git a/tensorflow/core/platform/cloud/file_block_cache.cc b/tensorflow/core/platform/cloud/file_block_cache.cc
index 6831600fb1..0375af516b 100644
--- a/tensorflow/core/platform/cloud/file_block_cache.cc
+++ b/tensorflow/core/platform/cloud/file_block_cache.cc
@@ -237,6 +237,14 @@ void FileBlockCache::Prune() {
}
}
+void FileBlockCache::Flush() {
+ mutex_lock lock(mu_);
+ block_map_.clear();
+ lru_list_.clear();
+ lra_list_.clear();
+ cache_size_ = 0;
+}
+
void FileBlockCache::RemoveFile(const string& filename) {
mutex_lock lock(mu_);
RemoveFile_Locked(filename);
diff --git a/tensorflow/core/platform/cloud/file_block_cache.h b/tensorflow/core/platform/cloud/file_block_cache.h
index 74e792a625..5c180e2332 100644
--- a/tensorflow/core/platform/cloud/file_block_cache.h
+++ b/tensorflow/core/platform/cloud/file_block_cache.h
@@ -90,6 +90,9 @@ class FileBlockCache {
/// Remove all cached blocks for `filename`.
void RemoveFile(const string& filename) LOCKS_EXCLUDED(mu_);
+ /// Remove all cached data.
+ void Flush() LOCKS_EXCLUDED(mu_);
+
/// Accessors for cache parameters.
size_t block_size() const { return block_size_; }
size_t max_bytes() const { return max_bytes_; }
diff --git a/tensorflow/core/platform/cloud/file_block_cache_test.cc b/tensorflow/core/platform/cloud/file_block_cache_test.cc
index ae87e0de29..596fdbf19e 100644
--- a/tensorflow/core/platform/cloud/file_block_cache_test.cc
+++ b/tensorflow/core/platform/cloud/file_block_cache_test.cc
@@ -495,5 +495,25 @@ TEST(FileBlockCacheTest, CoalesceConcurrentReads) {
EXPECT_EQ(1, num_requests);
}
+
+TEST(FileBlockCacheTest, Flush) {
+ int calls = 0;
+ auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
+ char* buffer, size_t* bytes_transferred) {
+ calls++;
+ memset(buffer, 'x', n);
+ *bytes_transferred = n;
+ return Status::OK();
+ };
+ FileBlockCache cache(16, 32, 0, fetcher);
+ std::vector<char> out;
+ TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out));
+ TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out));
+ EXPECT_EQ(calls, 1);
+ cache.Flush();
+ TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out));
+ EXPECT_EQ(calls, 2);
+}
+
} // namespace
} // namespace tensorflow
diff --git a/tensorflow/core/platform/cloud/gcs_file_system.cc b/tensorflow/core/platform/cloud/gcs_file_system.cc
index dffd1c4900..970a6b1dfd 100644
--- a/tensorflow/core/platform/cloud/gcs_file_system.cc
+++ b/tensorflow/core/platform/cloud/gcs_file_system.cc
@@ -1377,6 +1377,15 @@ Status GcsFileSystem::DeleteRecursively(const string& dirname,
return Status::OK();
}
+// Flushes all caches for filesystem metadata and file contents. Useful for
+// reclaiming memory once filesystem operations are done (e.g. model is loaded),
+// or for resetting the filesystem to a consistent state.
+void GcsFileSystem::FlushCaches() {
+ file_block_cache_->Flush();
+ stat_cache_->Clear();
+ matching_paths_cache_->Clear();
+}
+
// Creates an HttpRequest and sets several parameters that are common to all
// requests. All code (in GcsFileSystem) that creates an HttpRequest should
// go through this method, rather than directly using http_request_factory_.
diff --git a/tensorflow/core/platform/cloud/gcs_file_system.h b/tensorflow/core/platform/cloud/gcs_file_system.h
index 731f97a4aa..adde161a93 100644
--- a/tensorflow/core/platform/cloud/gcs_file_system.h
+++ b/tensorflow/core/platform/cloud/gcs_file_system.h
@@ -84,6 +84,8 @@ class GcsFileSystem : public FileSystem {
Status DeleteRecursively(const string& dirname, int64* undeleted_files,
int64* undeleted_dirs) override;
+ void FlushCaches() override;
+
/// These accessors are mainly for testing purposes, to verify that the
/// environment variables that control these parameters are handled correctly.
size_t block_size() const { return file_block_cache_->block_size(); }
diff --git a/tensorflow/core/platform/cloud/gcs_file_system_test.cc b/tensorflow/core/platform/cloud/gcs_file_system_test.cc
index 32bd946a67..772aec5273 100644
--- a/tensorflow/core/platform/cloud/gcs_file_system_test.cc
+++ b/tensorflow/core/platform/cloud/gcs_file_system_test.cc
@@ -195,6 +195,49 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache) {
EXPECT_EQ("0123", result);
}
+TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache_Flush) {
+ // Our underlying file in this test is a 15 byte file with contents
+ // "0123456789abcde".
+ std::vector<HttpRequest*> requests(
+ {new FakeHttpRequest(
+ "Uri: https://storage.googleapis.com/bucket/random_access.txt\n"
+ "Auth Token: fake_token\n"
+ "Range: 0-8\n"
+ "Timeouts: 5 1 20\n",
+ "012345678"),
+ new FakeHttpRequest(
+ "Uri: https://storage.googleapis.com/bucket/random_access.txt\n"
+ "Auth Token: fake_token\n"
+ "Range: 0-8\n"
+ "Timeouts: 5 1 20\n",
+ "012345678")});
+ GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider),
+ std::unique_ptr<HttpRequest::Factory>(
+ new FakeHttpRequestFactory(&requests)),
+ 9 /* block size */, 18 /* max bytes */,
+ 0 /* max staleness */, 0 /* stat cache max age */,
+ 0 /* stat cache max entries */,
+ 0 /* matching paths cache max age */,
+ 0 /* matching paths cache max entries */,
+ 0 /* initial retry delay */, kTestTimeoutConfig);
+
+ char scratch[100];
+ StringPiece result;
+ std::unique_ptr<RandomAccessFile> file;
+ TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
+ // Read the first chunk. The cache will be populated with the first block of
+ // 9 bytes.
+ scratch[5] = 'x';
+ TF_EXPECT_OK(file->Read(0, 4, &result, scratch));
+ EXPECT_EQ("0123", result);
+ EXPECT_EQ(scratch[5], 'x'); // Make sure we only copied 4 bytes.
+ // Flush caches and read the second chunk. This will be a cache miss, and
+ // the same block will be fetched again.
+ fs.FlushCaches();
+ TF_EXPECT_OK(file->Read(4, 4, &result, scratch));
+ EXPECT_EQ("4567", result);
+}
+
TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache_MaxStaleness) {
// Our underlying file in this test is a 16 byte file with contents
// "0123456789abcdef".
@@ -1270,6 +1313,50 @@ TEST(GcsFileSystemTest, GetMatchingPaths_Cache) {
}
}
+TEST(GcsFileSystemTest, GetMatchingPaths_Cache_Flush) {
+ std::vector<HttpRequest*> requests(
+ {new FakeHttpRequest(
+ "Uri: https://www.googleapis.com/storage/v1/b/bucket/o?"
+ "fields=items%2Fname%2CnextPageToken&prefix=path%2Fsubpath%2F\n"
+ "Auth Token: fake_token\n"
+ "Timeouts: 5 1 10\n",
+ "{\"items\": [ "
+ " { \"name\": \"path/subpath/file2.txt\" }]}"),
+ new FakeHttpRequest(
+ "Uri: https://www.googleapis.com/storage/v1/b/bucket/o?"
+ "fields=items%2Fname%2CnextPageToken&prefix=path%2Fsubpath%2F\n"
+ "Auth Token: fake_token\n"
+ "Timeouts: 5 1 10\n",
+ "{\"items\": [ "
+ " { \"name\": \"path/subpath/file2.txt\" }]}")});
+ GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider),
+ std::unique_ptr<HttpRequest::Factory>(
+ new FakeHttpRequestFactory(&requests)),
+ 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */,
+ 0 /* stat cache max age */, 0 /* stat cache max entries */,
+ 3600 /* matching paths cache max age */,
+ 0 /* matching paths cache max entries */,
+ 0 /* initial retry delay*/, kTestTimeoutConfig);
+
+ // This loop should trigger the first HTTP request to GCS.
+ for (int i = 0; i < 10; i++) {
+ std::vector<string> result;
+ TF_EXPECT_OK(
+ fs.GetMatchingPaths("gs://bucket/path/subpath/file2.txt", &result));
+ EXPECT_EQ(std::vector<string>({"gs://bucket/path/subpath/file2.txt"}),
+ result);
+ }
+ // After flushing caches, there should be another (identical) request to GCS.
+ fs.FlushCaches();
+ for (int i = 0; i < 10; i++) {
+ std::vector<string> result;
+ TF_EXPECT_OK(
+ fs.GetMatchingPaths("gs://bucket/path/subpath/file2.txt", &result));
+ EXPECT_EQ(std::vector<string>({"gs://bucket/path/subpath/file2.txt"}),
+ result);
+ }
+}
+
TEST(GcsFileSystemTest, DeleteFile) {
std::vector<HttpRequest*> requests(
{new FakeHttpRequest(
@@ -1895,6 +1982,50 @@ TEST(GcsFileSystemTest, Stat_Cache) {
}
}
+TEST(GcsFileSystemTest, Stat_Cache_Flush) {
+ std::vector<HttpRequest*> requests(
+ {new FakeHttpRequest(
+ "Uri: https://www.googleapis.com/storage/v1/b/bucket/o/"
+ "file.txt?fields=size%2Cupdated\n"
+ "Auth Token: fake_token\n"
+ "Timeouts: 5 1 10\n",
+ strings::StrCat("{\"size\": \"1010\","
+ "\"updated\": \"2016-04-29T23:15:24.896Z\"}")),
+ new FakeHttpRequest(
+ "Uri: https://www.googleapis.com/storage/v1/b/bucket/o/"
+ "file.txt?fields=size%2Cupdated\n"
+ "Auth Token: fake_token\n"
+ "Timeouts: 5 1 10\n",
+ strings::StrCat("{\"size\": \"1010\","
+ "\"updated\": \"2016-04-29T23:15:24.896Z\"}"))});
+ GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider),
+ std::unique_ptr<HttpRequest::Factory>(
+ new FakeHttpRequestFactory(&requests)),
+ 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */,
+ 3600 /* stat cache max age */,
+ 0 /* stat cache max entries */,
+ 0 /* matching paths cache max age */,
+ 0 /* matching paths cache max entries */,
+ 0 /* initial retry delay*/, kTestTimeoutConfig);
+ // There should be a single HTTP request to GCS for fs.Stat in this loop.
+ for (int i = 0; i < 10; i++) {
+ FileStatistics stat;
+ TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", &stat));
+ EXPECT_EQ(1010, stat.length);
+ EXPECT_NEAR(1461971724896, stat.mtime_nsec / 1000 / 1000, 1);
+ EXPECT_FALSE(stat.is_directory);
+ }
+ // After flushing caches, there should be a second request to GCS for fs.Stat.
+ fs.FlushCaches();
+ for (int i = 0; i < 10; i++) {
+ FileStatistics stat;
+ TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", &stat));
+ EXPECT_EQ(1010, stat.length);
+ EXPECT_NEAR(1461971724896, stat.mtime_nsec / 1000 / 1000, 1);
+ EXPECT_FALSE(stat.is_directory);
+ }
+}
+
TEST(GcsFileSystemTest, IsDirectory_NotFound) {
std::vector<HttpRequest*> requests(
{new FakeHttpRequest(
diff --git a/tensorflow/core/platform/file_system.cc b/tensorflow/core/platform/file_system.cc
index 938f5af487..14755891fa 100644
--- a/tensorflow/core/platform/file_system.cc
+++ b/tensorflow/core/platform/file_system.cc
@@ -73,6 +73,8 @@ Status FileSystem::IsDirectory(const string& name) {
return Status(tensorflow::error::FAILED_PRECONDITION, "Not a directory");
}
+void FileSystem::FlushCaches() {}
+
RandomAccessFile::~RandomAccessFile() {}
WritableFile::~WritableFile() {}
diff --git a/tensorflow/core/platform/file_system.h b/tensorflow/core/platform/file_system.h
index 903df96b58..d32efcea09 100644
--- a/tensorflow/core/platform/file_system.h
+++ b/tensorflow/core/platform/file_system.h
@@ -206,6 +206,9 @@ class FileSystem {
/// * UNIMPLEMENTED - The file factory doesn't support directories.
virtual Status IsDirectory(const string& fname);
+ /// \brief Flushes any cached filesystem objects from memory.
+ virtual void FlushCaches();
+
FileSystem() {}
virtual ~FileSystem();