diff options
author | A. Unique TensorFlower <gardener@tensorflow.org> | 2017-07-18 15:54:54 -0700 |
---|---|---|
committer | TensorFlower Gardener <gardener@tensorflow.org> | 2017-07-18 16:00:37 -0700 |
commit | ed3b2bc8274046ad71630a060e615ad4f78becb9 (patch) | |
tree | 293c218f207e43605845e914e34717df4279bf5a | |
parent | afe603348babf6f055d635e230e1494dd138df21 (diff) |
FileBlockCache will now run a thread to prune files with expired blocks, if configured with a nonzero max staleness.
PiperOrigin-RevId: 162416777
-rw-r--r-- | tensorflow/core/platform/cloud/file_block_cache.cc | 25 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/file_block_cache.h | 37 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/file_block_cache_test.cc | 91 |
3 files changed, 141 insertions, 12 deletions
diff --git a/tensorflow/core/platform/cloud/file_block_cache.cc b/tensorflow/core/platform/cloud/file_block_cache.cc index 27e023745e..e4970a4188 100644 --- a/tensorflow/core/platform/cloud/file_block_cache.cc +++ b/tensorflow/core/platform/cloud/file_block_cache.cc @@ -76,6 +76,8 @@ Status FileBlockCache::Read(const string& filename, size_t offset, size_t n, // Record the block timestamp, update the cache size, and add the block to // the cache. block->timestamp = env_->NowSeconds(); + lra_list_.push_front(key); + block->lra_iterator = lra_list_.begin(); cache_size_ += block->data.size(); entry = block_map_.emplace(std::make_pair(key, std::move(block))).first; } else { @@ -114,6 +116,28 @@ Status FileBlockCache::Read(const string& filename, size_t offset, size_t n, return Status::OK(); } +size_t FileBlockCache::CacheSize() const { + mutex_lock lock(mu_); + return cache_size_; +} + +void FileBlockCache::Prune() { + while (!WaitForNotificationWithTimeout(&stop_pruning_thread_, 1000000)) { + mutex_lock lock(mu_); + uint64 now = env_->NowSeconds(); + while (!lra_list_.empty()) { + auto it = block_map_.find(lra_list_.back()); + if (now - it->second->timestamp <= max_staleness_) { + // The oldest block is not yet expired. Come back later. + break; + } + // We need to make a copy of the filename here, since it could otherwise + // be used within RemoveFile_Locked after `it` is deleted. + RemoveFile_Locked(std::string(it->first.first)); + } + } +} + void FileBlockCache::RemoveFile(const string& filename) { mutex_lock lock(mu_); RemoveFile_Locked(filename); @@ -131,6 +155,7 @@ void FileBlockCache::RemoveFile_Locked(const string& filename) { void FileBlockCache::RemoveBlock(BlockMap::iterator entry) { lru_list_.erase(entry->second->lru_iterator); + lra_list_.erase(entry->second->lra_iterator); cache_size_ -= entry->second->data.size(); block_map_.erase(entry); } diff --git a/tensorflow/core/platform/cloud/file_block_cache.h b/tensorflow/core/platform/cloud/file_block_cache.h index 02e00b5bc9..0429228a2b 100644 --- a/tensorflow/core/platform/cloud/file_block_cache.h +++ b/tensorflow/core/platform/cloud/file_block_cache.h @@ -26,6 +26,7 @@ limitations under the License. #include "tensorflow/core/lib/core/stringpiece.h" #include "tensorflow/core/platform/env.h" #include "tensorflow/core/platform/mutex.h" +#include "tensorflow/core/platform/notification.h" #include "tensorflow/core/platform/thread_annotations.h" #include "tensorflow/core/platform/types.h" @@ -52,7 +53,21 @@ class FileBlockCache { max_bytes_(max_bytes), max_staleness_(max_staleness), block_fetcher_(block_fetcher), - env_(env) {} + env_(env) { + if (max_staleness_ > 0) { + pruning_thread_.reset(env_->StartThread(ThreadOptions(), "TF_prune_FBC", + [this] { Prune(); })); + } + } + + ~FileBlockCache() { + if (pruning_thread_) { + stop_pruning_thread_.Notify(); + // Destroying pruning_thread_ will block until Prune() receives the above + // notification and returns. + pruning_thread_.reset(); + } + } /// Read `n` bytes from `filename` starting at `offset` into `out`. This /// method will return: @@ -79,6 +94,9 @@ class FileBlockCache { size_t max_bytes() const { return max_bytes_; } uint64 max_staleness() const { return max_staleness_; } + /// The current size (in bytes) of the cache. + size_t CacheSize() const LOCKS_EXCLUDED(mu_); + private: /// The size of the blocks stored in the LRU cache, as well as the size of the /// reads from the underlying filesystem. @@ -107,6 +125,8 @@ class FileBlockCache { std::vector<char> data; /// A list iterator pointing to the block's position in the LRU list. std::list<Key>::iterator lru_iterator; + /// A list iterator pointing to the block's position in the LRA list. + std::list<Key>::iterator lra_iterator; /// The timestamp (seconds since epoch) at which the block was cached. uint64 timestamp; }; @@ -116,6 +136,9 @@ class FileBlockCache { /// The block map is an ordered map from Key to Block. typedef std::map<Key, std::unique_ptr<Block>> BlockMap; + /// Prune the cache by removing files with expired blocks. + void Prune() LOCKS_EXCLUDED(mu_); + /// Remove all blocks of a file, with mu_ already held. void RemoveFile_Locked(const string& filename) EXCLUSIVE_LOCKS_REQUIRED(mu_); @@ -123,8 +146,14 @@ class FileBlockCache { /// cache size accordingly. void RemoveBlock(BlockMap::iterator entry) EXCLUSIVE_LOCKS_REQUIRED(mu_); + /// The cache pruning thread that removes files with expired blocks. + std::unique_ptr<Thread> pruning_thread_; + + /// Notification for stopping the cache pruning thread. + Notification stop_pruning_thread_; + /// Guards access to the block map, LRU list, and cached byte count. - mutex mu_; + mutable mutex mu_; /// The block map (map from Key to Block). BlockMap block_map_ GUARDED_BY(mu_); @@ -133,6 +162,10 @@ class FileBlockCache { /// recently accessed block. std::list<Key> lru_list_ GUARDED_BY(mu_); + /// The LRA (least recently added) list of block keys. The front of the list + /// identifies the most recently added block. + std::list<Key> lra_list_ GUARDED_BY(mu_); + /// The combined number of bytes in all of the cached blocks. size_t cache_size_ GUARDED_BY(mu_) = 0; }; diff --git a/tensorflow/core/platform/cloud/file_block_cache_test.cc b/tensorflow/core/platform/cloud/file_block_cache_test.cc index 3f53cbc6ea..d01181daeb 100644 --- a/tensorflow/core/platform/cloud/file_block_cache_test.cc +++ b/tensorflow/core/platform/cloud/file_block_cache_test.cc @@ -22,6 +22,25 @@ limitations under the License. namespace tensorflow { namespace { +// This Env wrapper lets us control the NowSeconds() return value. +class FakeEnv : public EnvWrapper { + public: + FakeEnv() : EnvWrapper(Env::Default()) {} + + uint64 NowSeconds() override { + mutex_lock lock(mu_); + return now_; + } + + void SetNowSeconds(uint64 now) { + mutex_lock lock(mu_); + now_ = now; + } + + mutex mu_; + uint64 now_ = 1; +}; + TEST(FileBlockCacheTest, PassThrough) { const string want_filename = "foo/bar"; const size_t want_offset = 42; @@ -240,13 +259,6 @@ TEST(FileBlockCacheTest, LRU) { } TEST(FileBlockCacheTest, MaxStaleness) { - // This Env wrapper lets us control the NowSeconds() return value. - class FakeEnv : public EnvWrapper { - public: - FakeEnv() : EnvWrapper(Env::Default()) {} - uint64 NowSeconds() override { return now_; }; - uint64 now_ = 1; - }; int calls = 0; auto fetcher = [&calls](const string& filename, size_t offset, size_t n, std::vector<char>* out) { @@ -266,21 +278,21 @@ TEST(FileBlockCacheTest, MaxStaleness) { // count should advance every 3 seconds (i.e. every time the staleness is // greater than 2). for (int i = 1; i <= 10; i++) { - env->now_ = i + 1; + env->SetNowSeconds(i + 1); TF_EXPECT_OK(cache1.Read("", 0, 1, &out)); EXPECT_EQ(calls, 1 + i / 3); } // Now create a cache with max staleness of 0, and verify that it also works // as expected. calls = 0; - env->now_ = 0; + env->SetNowSeconds(0); FileBlockCache cache2(8, 16, 0 /* max staleness */, fetcher, env.get()); // Execute the first read to load the block. TF_EXPECT_OK(cache2.Read("", 0, 1, &out)); EXPECT_EQ(calls, 1); // Advance the clock by a huge amount and verify that the cached block is // used to satisfy the read. - env->now_ = 365 * 24 * 60 * 60; // ~1 year, just for fun. + env->SetNowSeconds(365 * 24 * 60 * 60); // ~1 year, just for fun. TF_EXPECT_OK(cache2.Read("", 0, 1, &out)); EXPECT_EQ(calls, 1); } @@ -347,5 +359,64 @@ TEST(FileBlockCacheTest, RemoveFile) { EXPECT_EQ(calls, 6); } +TEST(FileBlockCacheTest, Prune) { + int calls = 0; + auto fetcher = [&calls](const string& filename, size_t offset, size_t n, + std::vector<char>* out) { + calls++; + out->clear(); + out->resize(n, 'x'); + return Status::OK(); + }; + std::vector<char> out; + // Our fake environment is initialized with the current timestamp. + std::unique_ptr<FakeEnv> env(new FakeEnv); + uint64 now = Env::Default()->NowSeconds(); + env->SetNowSeconds(now); + FileBlockCache cache(8, 32, 1 /* max staleness */, fetcher, env.get()); + // Read three blocks into the cache, and advance the timestamp by one second + // with each read. Start with a block of "a" at the current timestamp `now`. + TF_EXPECT_OK(cache.Read("a", 0, 1, &out)); + // Now load a block of a different file "b" at timestamp `now` + 1 + env->SetNowSeconds(now + 1); + TF_EXPECT_OK(cache.Read("b", 0, 1, &out)); + // Now load a different block of file "a" at timestamp `now` + 1. When the + // first block of "a" expires, this block should also be removed because it + // also belongs to file "a". + TF_EXPECT_OK(cache.Read("a", 8, 1, &out)); + // Ensure that all blocks are in the cache (i.e. reads are cache hits). + EXPECT_EQ(cache.CacheSize(), 24); + EXPECT_EQ(calls, 3); + TF_EXPECT_OK(cache.Read("a", 0, 1, &out)); + TF_EXPECT_OK(cache.Read("b", 0, 1, &out)); + TF_EXPECT_OK(cache.Read("a", 8, 1, &out)); + EXPECT_EQ(calls, 3); + // Advance the fake timestamp so that "a" becomes stale via its first block. + env->SetNowSeconds(now + 2); + // The pruning thread periodically compares env->NowSeconds() with the oldest + // block's timestamp to see if it should evict any files. At the current fake + // timestamp of `now` + 2, file "a" is stale because its first block is stale, + // but file "b" is not stale yet. Thus, once the pruning thread wakes up (in + // one second of wall time), it should remove "a" and leave "b" alone. + uint64 start = Env::Default()->NowSeconds(); + do { + Env::Default()->SleepForMicroseconds(100000); + } while (cache.CacheSize() == 24 && Env::Default()->NowSeconds() - start < 3); + // There should be one block left in the cache, and it should be the first + // block of "b". + EXPECT_EQ(cache.CacheSize(), 8); + TF_EXPECT_OK(cache.Read("b", 0, 1, &out)); + EXPECT_EQ(calls, 3); + // Advance the fake time to `now` + 3, at which point "b" becomes stale. + env->SetNowSeconds(now + 3); + // Wait for the pruner to remove "b". + start = Env::Default()->NowSeconds(); + do { + Env::Default()->SleepForMicroseconds(100000); + } while (cache.CacheSize() == 8 && Env::Default()->NowSeconds() - start < 3); + // The cache should now be empty. + EXPECT_EQ(cache.CacheSize(), 0); +} + } // namespace } // namespace tensorflow |