aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar A. Unique TensorFlower <gardener@tensorflow.org>2017-07-18 15:54:54 -0700
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2017-07-18 16:00:37 -0700
commited3b2bc8274046ad71630a060e615ad4f78becb9 (patch)
tree293c218f207e43605845e914e34717df4279bf5a
parentafe603348babf6f055d635e230e1494dd138df21 (diff)
FileBlockCache will now run a thread to prune files with expired blocks, if configured with a nonzero max staleness.
PiperOrigin-RevId: 162416777
-rw-r--r--tensorflow/core/platform/cloud/file_block_cache.cc25
-rw-r--r--tensorflow/core/platform/cloud/file_block_cache.h37
-rw-r--r--tensorflow/core/platform/cloud/file_block_cache_test.cc91
3 files changed, 141 insertions, 12 deletions
diff --git a/tensorflow/core/platform/cloud/file_block_cache.cc b/tensorflow/core/platform/cloud/file_block_cache.cc
index 27e023745e..e4970a4188 100644
--- a/tensorflow/core/platform/cloud/file_block_cache.cc
+++ b/tensorflow/core/platform/cloud/file_block_cache.cc
@@ -76,6 +76,8 @@ Status FileBlockCache::Read(const string& filename, size_t offset, size_t n,
// Record the block timestamp, update the cache size, and add the block to
// the cache.
block->timestamp = env_->NowSeconds();
+ lra_list_.push_front(key);
+ block->lra_iterator = lra_list_.begin();
cache_size_ += block->data.size();
entry = block_map_.emplace(std::make_pair(key, std::move(block))).first;
} else {
@@ -114,6 +116,28 @@ Status FileBlockCache::Read(const string& filename, size_t offset, size_t n,
return Status::OK();
}
+size_t FileBlockCache::CacheSize() const {
+ mutex_lock lock(mu_);
+ return cache_size_;
+}
+
+void FileBlockCache::Prune() {
+ while (!WaitForNotificationWithTimeout(&stop_pruning_thread_, 1000000)) {
+ mutex_lock lock(mu_);
+ uint64 now = env_->NowSeconds();
+ while (!lra_list_.empty()) {
+ auto it = block_map_.find(lra_list_.back());
+ if (now - it->second->timestamp <= max_staleness_) {
+ // The oldest block is not yet expired. Come back later.
+ break;
+ }
+ // We need to make a copy of the filename here, since it could otherwise
+ // be used within RemoveFile_Locked after `it` is deleted.
+ RemoveFile_Locked(std::string(it->first.first));
+ }
+ }
+}
+
void FileBlockCache::RemoveFile(const string& filename) {
mutex_lock lock(mu_);
RemoveFile_Locked(filename);
@@ -131,6 +155,7 @@ void FileBlockCache::RemoveFile_Locked(const string& filename) {
void FileBlockCache::RemoveBlock(BlockMap::iterator entry) {
lru_list_.erase(entry->second->lru_iterator);
+ lra_list_.erase(entry->second->lra_iterator);
cache_size_ -= entry->second->data.size();
block_map_.erase(entry);
}
diff --git a/tensorflow/core/platform/cloud/file_block_cache.h b/tensorflow/core/platform/cloud/file_block_cache.h
index 02e00b5bc9..0429228a2b 100644
--- a/tensorflow/core/platform/cloud/file_block_cache.h
+++ b/tensorflow/core/platform/cloud/file_block_cache.h
@@ -26,6 +26,7 @@ limitations under the License.
#include "tensorflow/core/lib/core/stringpiece.h"
#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/mutex.h"
+#include "tensorflow/core/platform/notification.h"
#include "tensorflow/core/platform/thread_annotations.h"
#include "tensorflow/core/platform/types.h"
@@ -52,7 +53,21 @@ class FileBlockCache {
max_bytes_(max_bytes),
max_staleness_(max_staleness),
block_fetcher_(block_fetcher),
- env_(env) {}
+ env_(env) {
+ if (max_staleness_ > 0) {
+ pruning_thread_.reset(env_->StartThread(ThreadOptions(), "TF_prune_FBC",
+ [this] { Prune(); }));
+ }
+ }
+
+ ~FileBlockCache() {
+ if (pruning_thread_) {
+ stop_pruning_thread_.Notify();
+ // Destroying pruning_thread_ will block until Prune() receives the above
+ // notification and returns.
+ pruning_thread_.reset();
+ }
+ }
/// Read `n` bytes from `filename` starting at `offset` into `out`. This
/// method will return:
@@ -79,6 +94,9 @@ class FileBlockCache {
size_t max_bytes() const { return max_bytes_; }
uint64 max_staleness() const { return max_staleness_; }
+ /// The current size (in bytes) of the cache.
+ size_t CacheSize() const LOCKS_EXCLUDED(mu_);
+
private:
/// The size of the blocks stored in the LRU cache, as well as the size of the
/// reads from the underlying filesystem.
@@ -107,6 +125,8 @@ class FileBlockCache {
std::vector<char> data;
/// A list iterator pointing to the block's position in the LRU list.
std::list<Key>::iterator lru_iterator;
+ /// A list iterator pointing to the block's position in the LRA list.
+ std::list<Key>::iterator lra_iterator;
/// The timestamp (seconds since epoch) at which the block was cached.
uint64 timestamp;
};
@@ -116,6 +136,9 @@ class FileBlockCache {
/// The block map is an ordered map from Key to Block.
typedef std::map<Key, std::unique_ptr<Block>> BlockMap;
+ /// Prune the cache by removing files with expired blocks.
+ void Prune() LOCKS_EXCLUDED(mu_);
+
/// Remove all blocks of a file, with mu_ already held.
void RemoveFile_Locked(const string& filename) EXCLUSIVE_LOCKS_REQUIRED(mu_);
@@ -123,8 +146,14 @@ class FileBlockCache {
/// cache size accordingly.
void RemoveBlock(BlockMap::iterator entry) EXCLUSIVE_LOCKS_REQUIRED(mu_);
+ /// The cache pruning thread that removes files with expired blocks.
+ std::unique_ptr<Thread> pruning_thread_;
+
+ /// Notification for stopping the cache pruning thread.
+ Notification stop_pruning_thread_;
+
/// Guards access to the block map, LRU list, and cached byte count.
- mutex mu_;
+ mutable mutex mu_;
/// The block map (map from Key to Block).
BlockMap block_map_ GUARDED_BY(mu_);
@@ -133,6 +162,10 @@ class FileBlockCache {
/// recently accessed block.
std::list<Key> lru_list_ GUARDED_BY(mu_);
+ /// The LRA (least recently added) list of block keys. The front of the list
+ /// identifies the most recently added block.
+ std::list<Key> lra_list_ GUARDED_BY(mu_);
+
/// The combined number of bytes in all of the cached blocks.
size_t cache_size_ GUARDED_BY(mu_) = 0;
};
diff --git a/tensorflow/core/platform/cloud/file_block_cache_test.cc b/tensorflow/core/platform/cloud/file_block_cache_test.cc
index 3f53cbc6ea..d01181daeb 100644
--- a/tensorflow/core/platform/cloud/file_block_cache_test.cc
+++ b/tensorflow/core/platform/cloud/file_block_cache_test.cc
@@ -22,6 +22,25 @@ limitations under the License.
namespace tensorflow {
namespace {
+// This Env wrapper lets us control the NowSeconds() return value.
+class FakeEnv : public EnvWrapper {
+ public:
+ FakeEnv() : EnvWrapper(Env::Default()) {}
+
+ uint64 NowSeconds() override {
+ mutex_lock lock(mu_);
+ return now_;
+ }
+
+ void SetNowSeconds(uint64 now) {
+ mutex_lock lock(mu_);
+ now_ = now;
+ }
+
+ mutex mu_;
+ uint64 now_ = 1;
+};
+
TEST(FileBlockCacheTest, PassThrough) {
const string want_filename = "foo/bar";
const size_t want_offset = 42;
@@ -240,13 +259,6 @@ TEST(FileBlockCacheTest, LRU) {
}
TEST(FileBlockCacheTest, MaxStaleness) {
- // This Env wrapper lets us control the NowSeconds() return value.
- class FakeEnv : public EnvWrapper {
- public:
- FakeEnv() : EnvWrapper(Env::Default()) {}
- uint64 NowSeconds() override { return now_; };
- uint64 now_ = 1;
- };
int calls = 0;
auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
std::vector<char>* out) {
@@ -266,21 +278,21 @@ TEST(FileBlockCacheTest, MaxStaleness) {
// count should advance every 3 seconds (i.e. every time the staleness is
// greater than 2).
for (int i = 1; i <= 10; i++) {
- env->now_ = i + 1;
+ env->SetNowSeconds(i + 1);
TF_EXPECT_OK(cache1.Read("", 0, 1, &out));
EXPECT_EQ(calls, 1 + i / 3);
}
// Now create a cache with max staleness of 0, and verify that it also works
// as expected.
calls = 0;
- env->now_ = 0;
+ env->SetNowSeconds(0);
FileBlockCache cache2(8, 16, 0 /* max staleness */, fetcher, env.get());
// Execute the first read to load the block.
TF_EXPECT_OK(cache2.Read("", 0, 1, &out));
EXPECT_EQ(calls, 1);
// Advance the clock by a huge amount and verify that the cached block is
// used to satisfy the read.
- env->now_ = 365 * 24 * 60 * 60; // ~1 year, just for fun.
+ env->SetNowSeconds(365 * 24 * 60 * 60); // ~1 year, just for fun.
TF_EXPECT_OK(cache2.Read("", 0, 1, &out));
EXPECT_EQ(calls, 1);
}
@@ -347,5 +359,64 @@ TEST(FileBlockCacheTest, RemoveFile) {
EXPECT_EQ(calls, 6);
}
+TEST(FileBlockCacheTest, Prune) {
+ int calls = 0;
+ auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
+ std::vector<char>* out) {
+ calls++;
+ out->clear();
+ out->resize(n, 'x');
+ return Status::OK();
+ };
+ std::vector<char> out;
+ // Our fake environment is initialized with the current timestamp.
+ std::unique_ptr<FakeEnv> env(new FakeEnv);
+ uint64 now = Env::Default()->NowSeconds();
+ env->SetNowSeconds(now);
+ FileBlockCache cache(8, 32, 1 /* max staleness */, fetcher, env.get());
+ // Read three blocks into the cache, and advance the timestamp by one second
+ // with each read. Start with a block of "a" at the current timestamp `now`.
+ TF_EXPECT_OK(cache.Read("a", 0, 1, &out));
+ // Now load a block of a different file "b" at timestamp `now` + 1
+ env->SetNowSeconds(now + 1);
+ TF_EXPECT_OK(cache.Read("b", 0, 1, &out));
+ // Now load a different block of file "a" at timestamp `now` + 1. When the
+ // first block of "a" expires, this block should also be removed because it
+ // also belongs to file "a".
+ TF_EXPECT_OK(cache.Read("a", 8, 1, &out));
+ // Ensure that all blocks are in the cache (i.e. reads are cache hits).
+ EXPECT_EQ(cache.CacheSize(), 24);
+ EXPECT_EQ(calls, 3);
+ TF_EXPECT_OK(cache.Read("a", 0, 1, &out));
+ TF_EXPECT_OK(cache.Read("b", 0, 1, &out));
+ TF_EXPECT_OK(cache.Read("a", 8, 1, &out));
+ EXPECT_EQ(calls, 3);
+ // Advance the fake timestamp so that "a" becomes stale via its first block.
+ env->SetNowSeconds(now + 2);
+ // The pruning thread periodically compares env->NowSeconds() with the oldest
+ // block's timestamp to see if it should evict any files. At the current fake
+ // timestamp of `now` + 2, file "a" is stale because its first block is stale,
+ // but file "b" is not stale yet. Thus, once the pruning thread wakes up (in
+ // one second of wall time), it should remove "a" and leave "b" alone.
+ uint64 start = Env::Default()->NowSeconds();
+ do {
+ Env::Default()->SleepForMicroseconds(100000);
+ } while (cache.CacheSize() == 24 && Env::Default()->NowSeconds() - start < 3);
+ // There should be one block left in the cache, and it should be the first
+ // block of "b".
+ EXPECT_EQ(cache.CacheSize(), 8);
+ TF_EXPECT_OK(cache.Read("b", 0, 1, &out));
+ EXPECT_EQ(calls, 3);
+ // Advance the fake time to `now` + 3, at which point "b" becomes stale.
+ env->SetNowSeconds(now + 3);
+ // Wait for the pruner to remove "b".
+ start = Env::Default()->NowSeconds();
+ do {
+ Env::Default()->SleepForMicroseconds(100000);
+ } while (cache.CacheSize() == 8 && Env::Default()->NowSeconds() - start < 3);
+ // The cache should now be empty.
+ EXPECT_EQ(cache.CacheSize(), 0);
+}
+
} // namespace
} // namespace tensorflow