diff options
author | A. Unique TensorFlower <gardener@tensorflow.org> | 2018-01-04 18:17:49 -0800 |
---|---|---|
committer | TensorFlower Gardener <gardener@tensorflow.org> | 2018-01-04 18:21:42 -0800 |
commit | c9ed9bf846c6c8e8566082ce4ac201a529c23355 (patch) | |
tree | 7138a307c9baf54df8799884e8f9bed3a43d7b71 | |
parent | 4f877d4e54bb2427882f4a800607a1cf0531b293 (diff) |
Add a FlushCaches() method to the FileSystem interface, and provide an implementation for GcsFileSystem.
PiperOrigin-RevId: 180873963
-rw-r--r-- | tensorflow/core/platform/cloud/expiring_lru_cache.h | 7 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/expiring_lru_cache_test.cc | 22 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/file_block_cache.cc | 8 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/file_block_cache.h | 3 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/file_block_cache_test.cc | 20 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/gcs_file_system.cc | 9 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/gcs_file_system.h | 2 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/gcs_file_system_test.cc | 131 | ||||
-rw-r--r-- | tensorflow/core/platform/file_system.cc | 2 | ||||
-rw-r--r-- | tensorflow/core/platform/file_system.h | 3 |
10 files changed, 207 insertions, 0 deletions
diff --git a/tensorflow/core/platform/cloud/expiring_lru_cache.h b/tensorflow/core/platform/cloud/expiring_lru_cache.h index 3fc23a4306..c738497ddd 100644 --- a/tensorflow/core/platform/cloud/expiring_lru_cache.h +++ b/tensorflow/core/platform/cloud/expiring_lru_cache.h @@ -88,6 +88,13 @@ class ExpiringLRUCache { return s; } + /// Clear the cache. + void Clear() { + mutex_lock lock(mu_); + cache_.clear(); + lru_list_.clear(); + } + /// Accessors for cache parameters. uint64 max_age() const { return max_age_; } size_t max_entries() const { return max_entries_; } diff --git a/tensorflow/core/platform/cloud/expiring_lru_cache_test.cc b/tensorflow/core/platform/cloud/expiring_lru_cache_test.cc index 8f8d5744a4..3bc6db3842 100644 --- a/tensorflow/core/platform/cloud/expiring_lru_cache_test.cc +++ b/tensorflow/core/platform/cloud/expiring_lru_cache_test.cc @@ -152,5 +152,27 @@ TEST(ExpiringLRUCacheTest, LookupOrCompute) { EXPECT_EQ(num_compute_calls, 6); } +TEST(ExpiringLRUCacheTest, Clear) { + ExpiringLRUCache<int> cache(1, 4); + cache.Insert("a", 1); + cache.Insert("b", 2); + cache.Insert("c", 3); + cache.Insert("d", 4); + int value = 0; + EXPECT_TRUE(cache.Lookup("a", &value)); + EXPECT_EQ(value, 1); + EXPECT_TRUE(cache.Lookup("b", &value)); + EXPECT_EQ(value, 2); + EXPECT_TRUE(cache.Lookup("c", &value)); + EXPECT_EQ(value, 3); + EXPECT_TRUE(cache.Lookup("d", &value)); + EXPECT_EQ(value, 4); + cache.Clear(); + EXPECT_FALSE(cache.Lookup("a", &value)); + EXPECT_FALSE(cache.Lookup("b", &value)); + EXPECT_FALSE(cache.Lookup("c", &value)); + EXPECT_FALSE(cache.Lookup("d", &value)); +} + } // namespace } // namespace tensorflow diff --git a/tensorflow/core/platform/cloud/file_block_cache.cc b/tensorflow/core/platform/cloud/file_block_cache.cc index 6831600fb1..0375af516b 100644 --- a/tensorflow/core/platform/cloud/file_block_cache.cc +++ b/tensorflow/core/platform/cloud/file_block_cache.cc @@ -237,6 +237,14 @@ void FileBlockCache::Prune() { } } +void FileBlockCache::Flush() { + mutex_lock lock(mu_); + block_map_.clear(); + lru_list_.clear(); + lra_list_.clear(); + cache_size_ = 0; +} + void FileBlockCache::RemoveFile(const string& filename) { mutex_lock lock(mu_); RemoveFile_Locked(filename); diff --git a/tensorflow/core/platform/cloud/file_block_cache.h b/tensorflow/core/platform/cloud/file_block_cache.h index 74e792a625..5c180e2332 100644 --- a/tensorflow/core/platform/cloud/file_block_cache.h +++ b/tensorflow/core/platform/cloud/file_block_cache.h @@ -90,6 +90,9 @@ class FileBlockCache { /// Remove all cached blocks for `filename`. void RemoveFile(const string& filename) LOCKS_EXCLUDED(mu_); + /// Remove all cached data. + void Flush() LOCKS_EXCLUDED(mu_); + /// Accessors for cache parameters. size_t block_size() const { return block_size_; } size_t max_bytes() const { return max_bytes_; } diff --git a/tensorflow/core/platform/cloud/file_block_cache_test.cc b/tensorflow/core/platform/cloud/file_block_cache_test.cc index ae87e0de29..596fdbf19e 100644 --- a/tensorflow/core/platform/cloud/file_block_cache_test.cc +++ b/tensorflow/core/platform/cloud/file_block_cache_test.cc @@ -495,5 +495,25 @@ TEST(FileBlockCacheTest, CoalesceConcurrentReads) { EXPECT_EQ(1, num_requests); } + +TEST(FileBlockCacheTest, Flush) { + int calls = 0; + auto fetcher = [&calls](const string& filename, size_t offset, size_t n, + char* buffer, size_t* bytes_transferred) { + calls++; + memset(buffer, 'x', n); + *bytes_transferred = n; + return Status::OK(); + }; + FileBlockCache cache(16, 32, 0, fetcher); + std::vector<char> out; + TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out)); + TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out)); + EXPECT_EQ(calls, 1); + cache.Flush(); + TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out)); + EXPECT_EQ(calls, 2); +} + } // namespace } // namespace tensorflow diff --git a/tensorflow/core/platform/cloud/gcs_file_system.cc b/tensorflow/core/platform/cloud/gcs_file_system.cc index dffd1c4900..970a6b1dfd 100644 --- a/tensorflow/core/platform/cloud/gcs_file_system.cc +++ b/tensorflow/core/platform/cloud/gcs_file_system.cc @@ -1377,6 +1377,15 @@ Status GcsFileSystem::DeleteRecursively(const string& dirname, return Status::OK(); } +// Flushes all caches for filesystem metadata and file contents. Useful for +// reclaiming memory once filesystem operations are done (e.g. model is loaded), +// or for resetting the filesystem to a consistent state. +void GcsFileSystem::FlushCaches() { + file_block_cache_->Flush(); + stat_cache_->Clear(); + matching_paths_cache_->Clear(); +} + // Creates an HttpRequest and sets several parameters that are common to all // requests. All code (in GcsFileSystem) that creates an HttpRequest should // go through this method, rather than directly using http_request_factory_. diff --git a/tensorflow/core/platform/cloud/gcs_file_system.h b/tensorflow/core/platform/cloud/gcs_file_system.h index 731f97a4aa..adde161a93 100644 --- a/tensorflow/core/platform/cloud/gcs_file_system.h +++ b/tensorflow/core/platform/cloud/gcs_file_system.h @@ -84,6 +84,8 @@ class GcsFileSystem : public FileSystem { Status DeleteRecursively(const string& dirname, int64* undeleted_files, int64* undeleted_dirs) override; + void FlushCaches() override; + /// These accessors are mainly for testing purposes, to verify that the /// environment variables that control these parameters are handled correctly. size_t block_size() const { return file_block_cache_->block_size(); } diff --git a/tensorflow/core/platform/cloud/gcs_file_system_test.cc b/tensorflow/core/platform/cloud/gcs_file_system_test.cc index 32bd946a67..772aec5273 100644 --- a/tensorflow/core/platform/cloud/gcs_file_system_test.cc +++ b/tensorflow/core/platform/cloud/gcs_file_system_test.cc @@ -195,6 +195,49 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache) { EXPECT_EQ("0123", result); } +TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache_Flush) { + // Our underlying file in this test is a 15 byte file with contents + // "0123456789abcde". + std::vector<HttpRequest*> requests( + {new FakeHttpRequest( + "Uri: https://storage.googleapis.com/bucket/random_access.txt\n" + "Auth Token: fake_token\n" + "Range: 0-8\n" + "Timeouts: 5 1 20\n", + "012345678"), + new FakeHttpRequest( + "Uri: https://storage.googleapis.com/bucket/random_access.txt\n" + "Auth Token: fake_token\n" + "Range: 0-8\n" + "Timeouts: 5 1 20\n", + "012345678")}); + GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), + std::unique_ptr<HttpRequest::Factory>( + new FakeHttpRequestFactory(&requests)), + 9 /* block size */, 18 /* max bytes */, + 0 /* max staleness */, 0 /* stat cache max age */, + 0 /* stat cache max entries */, + 0 /* matching paths cache max age */, + 0 /* matching paths cache max entries */, + 0 /* initial retry delay */, kTestTimeoutConfig); + + char scratch[100]; + StringPiece result; + std::unique_ptr<RandomAccessFile> file; + TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file)); + // Read the first chunk. The cache will be populated with the first block of + // 9 bytes. + scratch[5] = 'x'; + TF_EXPECT_OK(file->Read(0, 4, &result, scratch)); + EXPECT_EQ("0123", result); + EXPECT_EQ(scratch[5], 'x'); // Make sure we only copied 4 bytes. + // Flush caches and read the second chunk. This will be a cache miss, and + // the same block will be fetched again. + fs.FlushCaches(); + TF_EXPECT_OK(file->Read(4, 4, &result, scratch)); + EXPECT_EQ("4567", result); +} + TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache_MaxStaleness) { // Our underlying file in this test is a 16 byte file with contents // "0123456789abcdef". @@ -1270,6 +1313,50 @@ TEST(GcsFileSystemTest, GetMatchingPaths_Cache) { } } +TEST(GcsFileSystemTest, GetMatchingPaths_Cache_Flush) { + std::vector<HttpRequest*> requests( + {new FakeHttpRequest( + "Uri: https://www.googleapis.com/storage/v1/b/bucket/o?" + "fields=items%2Fname%2CnextPageToken&prefix=path%2Fsubpath%2F\n" + "Auth Token: fake_token\n" + "Timeouts: 5 1 10\n", + "{\"items\": [ " + " { \"name\": \"path/subpath/file2.txt\" }]}"), + new FakeHttpRequest( + "Uri: https://www.googleapis.com/storage/v1/b/bucket/o?" + "fields=items%2Fname%2CnextPageToken&prefix=path%2Fsubpath%2F\n" + "Auth Token: fake_token\n" + "Timeouts: 5 1 10\n", + "{\"items\": [ " + " { \"name\": \"path/subpath/file2.txt\" }]}")}); + GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), + std::unique_ptr<HttpRequest::Factory>( + new FakeHttpRequestFactory(&requests)), + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* stat cache max age */, 0 /* stat cache max entries */, + 3600 /* matching paths cache max age */, + 0 /* matching paths cache max entries */, + 0 /* initial retry delay*/, kTestTimeoutConfig); + + // This loop should trigger the first HTTP request to GCS. + for (int i = 0; i < 10; i++) { + std::vector<string> result; + TF_EXPECT_OK( + fs.GetMatchingPaths("gs://bucket/path/subpath/file2.txt", &result)); + EXPECT_EQ(std::vector<string>({"gs://bucket/path/subpath/file2.txt"}), + result); + } + // After flushing caches, there should be another (identical) request to GCS. + fs.FlushCaches(); + for (int i = 0; i < 10; i++) { + std::vector<string> result; + TF_EXPECT_OK( + fs.GetMatchingPaths("gs://bucket/path/subpath/file2.txt", &result)); + EXPECT_EQ(std::vector<string>({"gs://bucket/path/subpath/file2.txt"}), + result); + } +} + TEST(GcsFileSystemTest, DeleteFile) { std::vector<HttpRequest*> requests( {new FakeHttpRequest( @@ -1895,6 +1982,50 @@ TEST(GcsFileSystemTest, Stat_Cache) { } } +TEST(GcsFileSystemTest, Stat_Cache_Flush) { + std::vector<HttpRequest*> requests( + {new FakeHttpRequest( + "Uri: https://www.googleapis.com/storage/v1/b/bucket/o/" + "file.txt?fields=size%2Cupdated\n" + "Auth Token: fake_token\n" + "Timeouts: 5 1 10\n", + strings::StrCat("{\"size\": \"1010\"," + "\"updated\": \"2016-04-29T23:15:24.896Z\"}")), + new FakeHttpRequest( + "Uri: https://www.googleapis.com/storage/v1/b/bucket/o/" + "file.txt?fields=size%2Cupdated\n" + "Auth Token: fake_token\n" + "Timeouts: 5 1 10\n", + strings::StrCat("{\"size\": \"1010\"," + "\"updated\": \"2016-04-29T23:15:24.896Z\"}"))}); + GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), + std::unique_ptr<HttpRequest::Factory>( + new FakeHttpRequestFactory(&requests)), + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 3600 /* stat cache max age */, + 0 /* stat cache max entries */, + 0 /* matching paths cache max age */, + 0 /* matching paths cache max entries */, + 0 /* initial retry delay*/, kTestTimeoutConfig); + // There should be a single HTTP request to GCS for fs.Stat in this loop. + for (int i = 0; i < 10; i++) { + FileStatistics stat; + TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", &stat)); + EXPECT_EQ(1010, stat.length); + EXPECT_NEAR(1461971724896, stat.mtime_nsec / 1000 / 1000, 1); + EXPECT_FALSE(stat.is_directory); + } + // After flushing caches, there should be a second request to GCS for fs.Stat. + fs.FlushCaches(); + for (int i = 0; i < 10; i++) { + FileStatistics stat; + TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", &stat)); + EXPECT_EQ(1010, stat.length); + EXPECT_NEAR(1461971724896, stat.mtime_nsec / 1000 / 1000, 1); + EXPECT_FALSE(stat.is_directory); + } +} + TEST(GcsFileSystemTest, IsDirectory_NotFound) { std::vector<HttpRequest*> requests( {new FakeHttpRequest( diff --git a/tensorflow/core/platform/file_system.cc b/tensorflow/core/platform/file_system.cc index 938f5af487..14755891fa 100644 --- a/tensorflow/core/platform/file_system.cc +++ b/tensorflow/core/platform/file_system.cc @@ -73,6 +73,8 @@ Status FileSystem::IsDirectory(const string& name) { return Status(tensorflow::error::FAILED_PRECONDITION, "Not a directory"); } +void FileSystem::FlushCaches() {} + RandomAccessFile::~RandomAccessFile() {} WritableFile::~WritableFile() {} diff --git a/tensorflow/core/platform/file_system.h b/tensorflow/core/platform/file_system.h index 903df96b58..d32efcea09 100644 --- a/tensorflow/core/platform/file_system.h +++ b/tensorflow/core/platform/file_system.h @@ -206,6 +206,9 @@ class FileSystem { /// * UNIMPLEMENTED - The file factory doesn't support directories. virtual Status IsDirectory(const string& fname); + /// \brief Flushes any cached filesystem objects from memory. + virtual void FlushCaches(); + FileSystem() {} virtual ~FileSystem(); |