aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Brennan Saeta <saeta@google.com>2018-03-02 12:19:23 -0800
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2018-03-02 12:23:04 -0800
commit41aa3e75ca35c763c23aeedf2409589b7814c7f1 (patch)
treea942b94371716b224107e779e3ad529d4936daba
parent2abc47106624e0102c917535dd6df45561550ade (diff)
GCS: Extract block cache interface from implementation.
PiperOrigin-RevId: 187652953
-rw-r--r--tensorflow/core/platform/cloud/BUILD20
-rw-r--r--tensorflow/core/platform/cloud/file_block_cache.h161
-rw-r--r--tensorflow/core/platform/cloud/gcs_file_system.cc15
-rw-r--r--tensorflow/core/platform/cloud/ram_file_block_cache.cc (renamed from tensorflow/core/platform/cloud/file_block_cache.cc)35
-rw-r--r--tensorflow/core/platform/cloud/ram_file_block_cache.h229
-rw-r--r--tensorflow/core/platform/cloud/ram_file_block_cache_test.cc (renamed from tensorflow/core/platform/cloud/file_block_cache_test.cc)60
6 files changed, 311 insertions, 209 deletions
diff --git a/tensorflow/core/platform/cloud/BUILD b/tensorflow/core/platform/cloud/BUILD
index 9ba25dea4f..0a17a419d3 100644
--- a/tensorflow/core/platform/cloud/BUILD
+++ b/tensorflow/core/platform/cloud/BUILD
@@ -38,7 +38,6 @@ cc_library(
cc_library(
name = "file_block_cache",
- srcs = ["file_block_cache.cc"],
hdrs = ["file_block_cache.h"],
copts = tf_copts(),
visibility = ["//tensorflow:__subpackages__"],
@@ -46,6 +45,18 @@ cc_library(
)
cc_library(
+ name = "ram_file_block_cache",
+ srcs = ["ram_file_block_cache.cc"],
+ hdrs = ["ram_file_block_cache.h"],
+ copts = tf_copts(),
+ visibility = ["//tensorflow:__subpackages__"],
+ deps = [
+ ":file_block_cache",
+ "//tensorflow/core:lib",
+ ],
+)
+
+cc_library(
name = "gcs_dns_cache",
srcs = ["gcs_dns_cache.cc"],
hdrs = ["gcs_dns_cache.h"],
@@ -83,6 +94,7 @@ cc_library(
":gcs_throttle",
":google_auth_provider",
":http_request",
+ ":ram_file_block_cache",
":retrying_file_system",
":retrying_utils",
":time_util",
@@ -245,12 +257,12 @@ tf_cc_test(
)
tf_cc_test(
- name = "file_block_cache_test",
+ name = "ram_file_block_cache_test",
size = "small",
- srcs = ["file_block_cache_test.cc"],
+ srcs = ["ram_file_block_cache_test.cc"],
deps = [
- ":file_block_cache",
":now_seconds_env",
+ ":ram_file_block_cache",
"//tensorflow/core:lib",
"//tensorflow/core:lib_internal",
"//tensorflow/core:test",
diff --git a/tensorflow/core/platform/cloud/file_block_cache.h b/tensorflow/core/platform/cloud/file_block_cache.h
index 5c180e2332..da16788247 100644
--- a/tensorflow/core/platform/cloud/file_block_cache.h
+++ b/tensorflow/core/platform/cloud/file_block_cache.h
@@ -32,7 +32,7 @@ limitations under the License.
namespace tensorflow {
-/// \brief An LRU block cache of file contents, keyed by {filename, offset}.
+/// \brief A block cache of file contents, keyed by {filename, offset}.
///
/// This class should be shared by read-only random access files on a remote
/// filesystem (e.g. GCS).
@@ -48,27 +48,7 @@ class FileBlockCache {
size_t* bytes_transferred)>
BlockFetcher;
- FileBlockCache(size_t block_size, size_t max_bytes, uint64 max_staleness,
- BlockFetcher block_fetcher, Env* env = Env::Default())
- : block_size_(block_size),
- max_bytes_(max_bytes),
- max_staleness_(max_staleness),
- block_fetcher_(block_fetcher),
- env_(env) {
- if (max_staleness_ > 0) {
- pruning_thread_.reset(env_->StartThread(ThreadOptions(), "TF_prune_FBC",
- [this] { Prune(); }));
- }
- }
-
- ~FileBlockCache() {
- if (pruning_thread_) {
- stop_pruning_thread_.Notify();
- // Destroying pruning_thread_ will block until Prune() receives the above
- // notification and returns.
- pruning_thread_.reset();
- }
- }
+ virtual ~FileBlockCache() {}
/// Read `n` bytes from `filename` starting at `offset` into `out`. This
/// method will return:
@@ -84,143 +64,22 @@ class FileBlockCache {
/// placed in `out`.
/// 4) OK otherwise (i.e. the read succeeded, and at least one byte was placed
/// in `out`).
- Status Read(const string& filename, size_t offset, size_t n, char* buffer,
- size_t* bytes_transferred);
+ virtual Status Read(const string& filename, size_t offset, size_t n,
+ char* buffer, size_t* bytes_transferred) = 0;
/// Remove all cached blocks for `filename`.
- void RemoveFile(const string& filename) LOCKS_EXCLUDED(mu_);
+ virtual void RemoveFile(const string& filename) = 0;
/// Remove all cached data.
- void Flush() LOCKS_EXCLUDED(mu_);
+ virtual void Flush() = 0;
/// Accessors for cache parameters.
- size_t block_size() const { return block_size_; }
- size_t max_bytes() const { return max_bytes_; }
- uint64 max_staleness() const { return max_staleness_; }
+ virtual size_t block_size() const = 0;
+ virtual size_t max_bytes() const = 0;
+ virtual uint64 max_staleness() const = 0;
/// The current size (in bytes) of the cache.
- size_t CacheSize() const LOCKS_EXCLUDED(mu_);
-
- private:
- /// The size of the blocks stored in the LRU cache, as well as the size of the
- /// reads from the underlying filesystem.
- const size_t block_size_;
- /// The maximum number of bytes (sum of block sizes) allowed in the LRU cache.
- const size_t max_bytes_;
- /// The maximum staleness of any block in the LRU cache, in seconds.
- const uint64 max_staleness_;
- /// The callback to read a block from the underlying filesystem.
- const BlockFetcher block_fetcher_;
- /// The Env from which we read timestamps.
- Env* const env_; // not owned
-
- /// \brief The key type for the file block cache.
- ///
- /// The file block cache key is a {filename, offset} pair.
- typedef std::pair<string, size_t> Key;
-
- /// \brief The state of a block.
- ///
- /// A block begins in the CREATED stage. The first thread will attempt to read
- /// the block from the filesystem, transitioning the state of the block to
- /// FETCHING. After completing, if the read was successful the state should
- /// be FINISHED. Otherwise the state should be ERROR. A subsequent read can
- /// re-fetch the block if the state is ERROR.
- enum class FetchState {
- CREATED,
- FETCHING,
- FINISHED,
- ERROR,
- };
-
- /// \brief A block of a file.
- ///
- /// A file block consists of the block data, the block's current position in
- /// the LRU cache, the timestamp (seconds since epoch) at which the block
- /// was cached, a coordination lock, and state & condition variables.
- ///
- /// Thread safety:
- /// The iterator and timestamp fields should only be accessed while holding
- /// the block-cache-wide mu_ instance variable. The state variable should only
- /// be accessed while holding the Block's mu lock. The data vector should only
- /// be accessed after state == FINISHED, and it should never be modified.
- ///
- /// In order to prevent deadlocks, never grab the block-cache-wide mu_ lock
- /// AFTER grabbing any block's mu lock. It is safe to grab mu without locking
- /// mu_.
- struct Block {
- /// The block data.
- std::vector<char> data;
- /// A list iterator pointing to the block's position in the LRU list.
- std::list<Key>::iterator lru_iterator;
- /// A list iterator pointing to the block's position in the LRA list.
- std::list<Key>::iterator lra_iterator;
- /// The timestamp (seconds since epoch) at which the block was cached.
- uint64 timestamp;
- /// Mutex to guard state variable
- mutex mu;
- /// The state of the block.
- FetchState state GUARDED_BY(mu) = FetchState::CREATED;
- /// Wait on cond_var if state is FETCHING.
- condition_variable cond_var;
- };
-
- /// \brief The block map type for the file block cache.
- ///
- /// The block map is an ordered map from Key to Block.
- typedef std::map<Key, std::shared_ptr<Block>> BlockMap;
-
- /// Prune the cache by removing files with expired blocks.
- void Prune() LOCKS_EXCLUDED(mu_);
-
- bool BlockNotStale(const std::shared_ptr<Block>& block)
- EXCLUSIVE_LOCKS_REQUIRED(mu_);
-
- /// Look up a Key in the block cache.
- std::shared_ptr<Block> Lookup(const Key& key) LOCKS_EXCLUDED(mu_);
-
- Status MaybeFetch(const Key& key, const std::shared_ptr<Block>& block)
- LOCKS_EXCLUDED(mu_);
-
- /// Trim the block cache to make room for another entry.
- void Trim() EXCLUSIVE_LOCKS_REQUIRED(mu_);
-
- /// Update the LRU iterator for the block at `key`.
- Status UpdateLRU(const Key& key, const std::shared_ptr<Block>& block)
- LOCKS_EXCLUDED(mu_);
-
- /// Remove all blocks of a file, with mu_ already held.
- void RemoveFile_Locked(const string& filename) EXCLUSIVE_LOCKS_REQUIRED(mu_);
-
- /// Remove the block `entry` from the block map and LRU list, and update the
- /// cache size accordingly.
- void RemoveBlock(BlockMap::iterator entry) EXCLUSIVE_LOCKS_REQUIRED(mu_);
-
- /// The cache pruning thread that removes files with expired blocks.
- std::unique_ptr<Thread> pruning_thread_;
-
- /// Notification for stopping the cache pruning thread.
- Notification stop_pruning_thread_;
-
- /// Guards access to the block map, LRU list, and cached byte count.
- mutable mutex mu_;
-
- /// The block map (map from Key to Block).
- BlockMap block_map_ GUARDED_BY(mu_);
-
- /// The LRU list of block keys. The front of the list identifies the most
- /// recently accessed block.
- std::list<Key> lru_list_ GUARDED_BY(mu_);
-
- /// The LRA (least recently added) list of block keys. The front of the list
- /// identifies the most recently added block.
- ///
- /// Note: blocks are added to lra_list_ only after they have successfully been
- /// fetched from the underlying block store.
- std::list<Key> lra_list_ GUARDED_BY(mu_);
-
- /// The combined number of bytes in all of the cached blocks.
- size_t cache_size_ GUARDED_BY(mu_) = 0;
+ virtual size_t CacheSize() const = 0;
};
} // namespace tensorflow
diff --git a/tensorflow/core/platform/cloud/gcs_file_system.cc b/tensorflow/core/platform/cloud/gcs_file_system.cc
index 01ca0d76ba..84b65cec4f 100644
--- a/tensorflow/core/platform/cloud/gcs_file_system.cc
+++ b/tensorflow/core/platform/cloud/gcs_file_system.cc
@@ -36,6 +36,7 @@ limitations under the License.
#include "tensorflow/core/platform/cloud/curl_http_request.h"
#include "tensorflow/core/platform/cloud/file_block_cache.h"
#include "tensorflow/core/platform/cloud/google_auth_provider.h"
+#include "tensorflow/core/platform/cloud/ram_file_block_cache.h"
#include "tensorflow/core/platform/cloud/retrying_utils.h"
#include "tensorflow/core/platform/cloud/time_util.h"
#include "tensorflow/core/platform/env.h"
@@ -783,13 +784,13 @@ Status GcsFileSystem::NewRandomAccessFile(
// A helper function to build a FileBlockCache for GcsFileSystem.
std::unique_ptr<FileBlockCache> GcsFileSystem::MakeFileBlockCache(
size_t block_size, size_t max_bytes, uint64 max_staleness) {
- std::unique_ptr<FileBlockCache> file_block_cache(
- new FileBlockCache(block_size, max_bytes, max_staleness,
- [this](const string& filename, size_t offset, size_t n,
- char* buffer, size_t* bytes_transferred) {
- return LoadBufferFromGCS(filename, offset, n, buffer,
- bytes_transferred);
- }));
+ std::unique_ptr<FileBlockCache> file_block_cache(new RamFileBlockCache(
+ block_size, max_bytes, max_staleness,
+ [this](const string& filename, size_t offset, size_t n, char* buffer,
+ size_t* bytes_transferred) {
+ return LoadBufferFromGCS(filename, offset, n, buffer,
+ bytes_transferred);
+ }));
return file_block_cache;
}
diff --git a/tensorflow/core/platform/cloud/file_block_cache.cc b/tensorflow/core/platform/cloud/ram_file_block_cache.cc
index 6add1142a1..55a5657a50 100644
--- a/tensorflow/core/platform/cloud/file_block_cache.cc
+++ b/tensorflow/core/platform/cloud/ram_file_block_cache.cc
@@ -13,7 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
-#include "tensorflow/core/platform/cloud/file_block_cache.h"
+#include "tensorflow/core/platform/cloud/ram_file_block_cache.h"
#include <cstring>
#include <memory>
#include "tensorflow/core/lib/gtl/cleanup.h"
@@ -21,7 +21,7 @@ limitations under the License.
namespace tensorflow {
-bool FileBlockCache::BlockNotStale(const std::shared_ptr<Block>& block) {
+bool RamFileBlockCache::BlockNotStale(const std::shared_ptr<Block>& block) {
mutex_lock l(block->mu);
if (block->state != FetchState::FINISHED) {
return true; // No need to check for staleness.
@@ -30,7 +30,8 @@ bool FileBlockCache::BlockNotStale(const std::shared_ptr<Block>& block) {
return env_->NowSeconds() - block->timestamp <= max_staleness_;
}
-std::shared_ptr<FileBlockCache::Block> FileBlockCache::Lookup(const Key& key) {
+std::shared_ptr<RamFileBlockCache::Block> RamFileBlockCache::Lookup(
+ const Key& key) {
mutex_lock lock(mu_);
auto entry = block_map_.find(key);
if (entry != block_map_.end()) {
@@ -55,15 +56,15 @@ std::shared_ptr<FileBlockCache::Block> FileBlockCache::Lookup(const Key& key) {
}
// Remove blocks from the cache until we do not exceed our maximum size.
-void FileBlockCache::Trim() {
+void RamFileBlockCache::Trim() {
while (!lru_list_.empty() && cache_size_ > max_bytes_) {
RemoveBlock(block_map_.find(lru_list_.back()));
}
}
/// Move the block to the front of the LRU list if it isn't already there.
-Status FileBlockCache::UpdateLRU(const Key& key,
- const std::shared_ptr<Block>& block) {
+Status RamFileBlockCache::UpdateLRU(const Key& key,
+ const std::shared_ptr<Block>& block) {
mutex_lock lock(mu_);
if (block->timestamp == 0) {
// The block was evicted from another thread. Allow it to remain evicted.
@@ -92,8 +93,8 @@ Status FileBlockCache::UpdateLRU(const Key& key,
return Status::OK();
}
-Status FileBlockCache::MaybeFetch(const Key& key,
- const std::shared_ptr<Block>& block) {
+Status RamFileBlockCache::MaybeFetch(const Key& key,
+ const std::shared_ptr<Block>& block) {
bool downloaded_block = false;
auto reconcile_state =
gtl::MakeCleanup([this, &downloaded_block, &key, &block] {
@@ -151,11 +152,11 @@ Status FileBlockCache::MaybeFetch(const Key& key,
}
}
return errors::Internal(
- "Control flow should never reach the end of FileBlockCache::Fetch.");
+ "Control flow should never reach the end of RamFileBlockCache::Fetch.");
}
-Status FileBlockCache::Read(const string& filename, size_t offset, size_t n,
- char* buffer, size_t* bytes_transferred) {
+Status RamFileBlockCache::Read(const string& filename, size_t offset, size_t n,
+ char* buffer, size_t* bytes_transferred) {
*bytes_transferred = 0;
if (n == 0) {
return Status::OK();
@@ -216,12 +217,12 @@ Status FileBlockCache::Read(const string& filename, size_t offset, size_t n,
return Status::OK();
}
-size_t FileBlockCache::CacheSize() const {
+size_t RamFileBlockCache::CacheSize() const {
mutex_lock lock(mu_);
return cache_size_;
}
-void FileBlockCache::Prune() {
+void RamFileBlockCache::Prune() {
while (!WaitForNotificationWithTimeout(&stop_pruning_thread_, 1000000)) {
mutex_lock lock(mu_);
uint64 now = env_->NowSeconds();
@@ -238,7 +239,7 @@ void FileBlockCache::Prune() {
}
}
-void FileBlockCache::Flush() {
+void RamFileBlockCache::Flush() {
mutex_lock lock(mu_);
block_map_.clear();
lru_list_.clear();
@@ -246,12 +247,12 @@ void FileBlockCache::Flush() {
cache_size_ = 0;
}
-void FileBlockCache::RemoveFile(const string& filename) {
+void RamFileBlockCache::RemoveFile(const string& filename) {
mutex_lock lock(mu_);
RemoveFile_Locked(filename);
}
-void FileBlockCache::RemoveFile_Locked(const string& filename) {
+void RamFileBlockCache::RemoveFile_Locked(const string& filename) {
Key begin = std::make_pair(filename, 0);
auto it = block_map_.lower_bound(begin);
while (it != block_map_.end() && it->first.first == filename) {
@@ -261,7 +262,7 @@ void FileBlockCache::RemoveFile_Locked(const string& filename) {
}
}
-void FileBlockCache::RemoveBlock(BlockMap::iterator entry) {
+void RamFileBlockCache::RemoveBlock(BlockMap::iterator entry) {
// This signals that the block is removed, and should not be inadvertently
// reinserted into the cache in UpdateLRU.
entry->second->timestamp = 0;
diff --git a/tensorflow/core/platform/cloud/ram_file_block_cache.h b/tensorflow/core/platform/cloud/ram_file_block_cache.h
new file mode 100644
index 0000000000..7fdd7b2e02
--- /dev/null
+++ b/tensorflow/core/platform/cloud/ram_file_block_cache.h
@@ -0,0 +1,229 @@
+/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+==============================================================================*/
+
+#ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_
+#define TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_
+
+#include <functional>
+#include <list>
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+#include "tensorflow/core/lib/core/status.h"
+#include "tensorflow/core/lib/core/stringpiece.h"
+#include "tensorflow/core/platform/cloud/file_block_cache.h"
+#include "tensorflow/core/platform/env.h"
+#include "tensorflow/core/platform/mutex.h"
+#include "tensorflow/core/platform/notification.h"
+#include "tensorflow/core/platform/thread_annotations.h"
+#include "tensorflow/core/platform/types.h"
+
+namespace tensorflow {
+
+/// \brief An LRU block cache of file contents, keyed by {filename, offset}.
+///
+/// This class should be shared by read-only random access files on a remote
+/// filesystem (e.g. GCS).
+class RamFileBlockCache : public FileBlockCache {
+ public:
+ /// The callback executed when a block is not found in the cache, and needs to
+ /// be fetched from the backing filesystem. This callback is provided when the
+ /// cache is constructed. The returned Status should be OK as long as the
+ /// read from the remote filesystem succeeded (similar to the semantics of the
+ /// read(2) system call).
+ typedef std::function<Status(const string& filename, size_t offset,
+ size_t buffer_size, char* buffer,
+ size_t* bytes_transferred)>
+ BlockFetcher;
+
+ RamFileBlockCache(size_t block_size, size_t max_bytes, uint64 max_staleness,
+ BlockFetcher block_fetcher, Env* env = Env::Default())
+ : block_size_(block_size),
+ max_bytes_(max_bytes),
+ max_staleness_(max_staleness),
+ block_fetcher_(block_fetcher),
+ env_(env) {
+ if (max_staleness_ > 0) {
+ pruning_thread_.reset(env_->StartThread(ThreadOptions(), "TF_prune_FBC",
+ [this] { Prune(); }));
+ }
+ }
+
+ ~RamFileBlockCache() override {
+ if (pruning_thread_) {
+ stop_pruning_thread_.Notify();
+ // Destroying pruning_thread_ will block until Prune() receives the above
+ // notification and returns.
+ pruning_thread_.reset();
+ }
+ }
+
+ /// Read `n` bytes from `filename` starting at `offset` into `out`. This
+ /// method will return:
+ ///
+ /// 1) The error from the remote filesystem, if the read from the remote
+ /// filesystem failed.
+ /// 2) PRECONDITION_FAILED if the read from the remote filesystem succeeded,
+ /// but the read returned a partial block, and the LRU cache contained a
+ /// block at a higher offset (indicating that the partial block should have
+ /// been a full block).
+ /// 3) OUT_OF_RANGE if the read from the remote filesystem succeeded, but
+ /// the file contents do not extend past `offset` and thus nothing was
+ /// placed in `out`.
+ /// 4) OK otherwise (i.e. the read succeeded, and at least one byte was placed
+ /// in `out`).
+ Status Read(const string& filename, size_t offset, size_t n, char* buffer,
+ size_t* bytes_transferred) override;
+
+ /// Remove all cached blocks for `filename`.
+ void RemoveFile(const string& filename) override LOCKS_EXCLUDED(mu_);
+
+ /// Remove all cached data.
+ void Flush() LOCKS_EXCLUDED(mu_) override;
+
+ /// Accessors for cache parameters.
+ size_t block_size() const override { return block_size_; }
+ size_t max_bytes() const override { return max_bytes_; }
+ uint64 max_staleness() const override { return max_staleness_; }
+
+ /// The current size (in bytes) of the cache.
+ size_t CacheSize() const override LOCKS_EXCLUDED(mu_);
+
+ private:
+ /// The size of the blocks stored in the LRU cache, as well as the size of the
+ /// reads from the underlying filesystem.
+ const size_t block_size_;
+ /// The maximum number of bytes (sum of block sizes) allowed in the LRU cache.
+ const size_t max_bytes_;
+ /// The maximum staleness of any block in the LRU cache, in seconds.
+ const uint64 max_staleness_;
+ /// The callback to read a block from the underlying filesystem.
+ const BlockFetcher block_fetcher_;
+ /// The Env from which we read timestamps.
+ Env* const env_; // not owned
+
+ /// \brief The key type for the file block cache.
+ ///
+ /// The file block cache key is a {filename, offset} pair.
+ typedef std::pair<string, size_t> Key;
+
+ /// \brief The state of a block.
+ ///
+ /// A block begins in the CREATED stage. The first thread will attempt to read
+ /// the block from the filesystem, transitioning the state of the block to
+ /// FETCHING. After completing, if the read was successful the state should
+ /// be FINISHED. Otherwise the state should be ERROR. A subsequent read can
+ /// re-fetch the block if the state is ERROR.
+ enum class FetchState {
+ CREATED,
+ FETCHING,
+ FINISHED,
+ ERROR,
+ };
+
+ /// \brief A block of a file.
+ ///
+ /// A file block consists of the block data, the block's current position in
+ /// the LRU cache, the timestamp (seconds since epoch) at which the block
+ /// was cached, a coordination lock, and state & condition variables.
+ ///
+ /// Thread safety:
+ /// The iterator and timestamp fields should only be accessed while holding
+ /// the block-cache-wide mu_ instance variable. The state variable should only
+ /// be accessed while holding the Block's mu lock. The data vector should only
+ /// be accessed after state == FINISHED, and it should never be modified.
+ ///
+ /// In order to prevent deadlocks, never grab the block-cache-wide mu_ lock
+ /// AFTER grabbing any block's mu lock. It is safe to grab mu without locking
+ /// mu_.
+ struct Block {
+ /// The block data.
+ std::vector<char> data;
+ /// A list iterator pointing to the block's position in the LRU list.
+ std::list<Key>::iterator lru_iterator;
+ /// A list iterator pointing to the block's position in the LRA list.
+ std::list<Key>::iterator lra_iterator;
+ /// The timestamp (seconds since epoch) at which the block was cached.
+ uint64 timestamp;
+ /// Mutex to guard state variable
+ mutex mu;
+ /// The state of the block.
+ FetchState state GUARDED_BY(mu) = FetchState::CREATED;
+ /// Wait on cond_var if state is FETCHING.
+ condition_variable cond_var;
+ };
+
+ /// \brief The block map type for the file block cache.
+ ///
+ /// The block map is an ordered map from Key to Block.
+ typedef std::map<Key, std::shared_ptr<Block>> BlockMap;
+
+ /// Prune the cache by removing files with expired blocks.
+ void Prune() LOCKS_EXCLUDED(mu_);
+
+ bool BlockNotStale(const std::shared_ptr<Block>& block)
+ EXCLUSIVE_LOCKS_REQUIRED(mu_);
+
+ /// Look up a Key in the block cache.
+ std::shared_ptr<Block> Lookup(const Key& key) LOCKS_EXCLUDED(mu_);
+
+ Status MaybeFetch(const Key& key, const std::shared_ptr<Block>& block)
+ LOCKS_EXCLUDED(mu_);
+
+ /// Trim the block cache to make room for another entry.
+ void Trim() EXCLUSIVE_LOCKS_REQUIRED(mu_);
+
+ /// Update the LRU iterator for the block at `key`.
+ Status UpdateLRU(const Key& key, const std::shared_ptr<Block>& block)
+ LOCKS_EXCLUDED(mu_);
+
+ /// Remove all blocks of a file, with mu_ already held.
+ void RemoveFile_Locked(const string& filename) EXCLUSIVE_LOCKS_REQUIRED(mu_);
+
+ /// Remove the block `entry` from the block map and LRU list, and update the
+ /// cache size accordingly.
+ void RemoveBlock(BlockMap::iterator entry) EXCLUSIVE_LOCKS_REQUIRED(mu_);
+
+ /// The cache pruning thread that removes files with expired blocks.
+ std::unique_ptr<Thread> pruning_thread_;
+
+ /// Notification for stopping the cache pruning thread.
+ Notification stop_pruning_thread_;
+
+ /// Guards access to the block map, LRU list, and cached byte count.
+ mutable mutex mu_;
+
+ /// The block map (map from Key to Block).
+ BlockMap block_map_ GUARDED_BY(mu_);
+
+ /// The LRU list of block keys. The front of the list identifies the most
+ /// recently accessed block.
+ std::list<Key> lru_list_ GUARDED_BY(mu_);
+
+ /// The LRA (least recently added) list of block keys. The front of the list
+ /// identifies the most recently added block.
+ ///
+ /// Note: blocks are added to lra_list_ only after they have successfully been
+ /// fetched from the underlying block store.
+ std::list<Key> lra_list_ GUARDED_BY(mu_);
+
+ /// The combined number of bytes in all of the cached blocks.
+ size_t cache_size_ GUARDED_BY(mu_) = 0;
+};
+
+} // namespace tensorflow
+
+#endif // TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_
diff --git a/tensorflow/core/platform/cloud/file_block_cache_test.cc b/tensorflow/core/platform/cloud/ram_file_block_cache_test.cc
index 596fdbf19e..d555b682a6 100644
--- a/tensorflow/core/platform/cloud/file_block_cache_test.cc
+++ b/tensorflow/core/platform/cloud/ram_file_block_cache_test.cc
@@ -13,7 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
-#include "tensorflow/core/platform/cloud/file_block_cache.h"
+#include "tensorflow/core/platform/cloud/ram_file_block_cache.h"
#include <cstring>
#include "tensorflow/core/lib/core/blocking_counter.h"
#include "tensorflow/core/lib/core/status_test_util.h"
@@ -25,8 +25,8 @@ limitations under the License.
namespace tensorflow {
namespace {
-Status ReadCache(FileBlockCache* cache, const string& filename, size_t offset,
- size_t n, std::vector<char>* out) {
+Status ReadCache(RamFileBlockCache* cache, const string& filename,
+ size_t offset, size_t n, std::vector<char>* out) {
out->clear();
out->resize(n, 0);
size_t bytes_transferred = 0;
@@ -37,7 +37,7 @@ Status ReadCache(FileBlockCache* cache, const string& filename, size_t offset,
return status;
}
-TEST(FileBlockCacheTest, PassThrough) {
+TEST(RamFileBlockCacheTest, PassThrough) {
const string want_filename = "foo/bar";
const size_t want_offset = 42;
const size_t want_n = 1024;
@@ -54,9 +54,9 @@ TEST(FileBlockCacheTest, PassThrough) {
return Status::OK();
};
// If block_size, max_bytes, or both are zero, the cache is a pass-through.
- FileBlockCache cache1(1, 0, 0, fetcher);
- FileBlockCache cache2(0, 1, 0, fetcher);
- FileBlockCache cache3(0, 0, 0, fetcher);
+ RamFileBlockCache cache1(1, 0, 0, fetcher);
+ RamFileBlockCache cache2(0, 1, 0, fetcher);
+ RamFileBlockCache cache3(0, 0, 0, fetcher);
std::vector<char> out;
TF_EXPECT_OK(ReadCache(&cache1, want_filename, want_offset, want_n, &out));
EXPECT_EQ(calls, 1);
@@ -66,7 +66,7 @@ TEST(FileBlockCacheTest, PassThrough) {
EXPECT_EQ(calls, 3);
}
-TEST(FileBlockCacheTest, BlockAlignment) {
+TEST(RamFileBlockCacheTest, BlockAlignment) {
// Initialize a 256-byte buffer. This is the file underlying the reads we'll
// do in this test.
const size_t size = 256;
@@ -89,7 +89,7 @@ TEST(FileBlockCacheTest, BlockAlignment) {
for (size_t block_size = 2; block_size <= 4; block_size++) {
// Make a cache of N-byte block size (1 block) and verify that reads of
// varying offsets and lengths return correct data.
- FileBlockCache cache(block_size, block_size, 0, fetcher);
+ RamFileBlockCache cache(block_size, block_size, 0, fetcher);
for (size_t offset = 0; offset < 10; offset++) {
for (size_t n = block_size - 2; n <= block_size + 2; n++) {
std::vector<char> got;
@@ -117,7 +117,7 @@ TEST(FileBlockCacheTest, BlockAlignment) {
}
}
-TEST(FileBlockCacheTest, CacheHits) {
+TEST(RamFileBlockCacheTest, CacheHits) {
const size_t block_size = 16;
std::set<size_t> calls;
auto fetcher = [&calls, block_size](const string& filename, size_t offset,
@@ -132,7 +132,7 @@ TEST(FileBlockCacheTest, CacheHits) {
return Status::OK();
};
const uint32 block_count = 256;
- FileBlockCache cache(block_size, block_count * block_size, 0, fetcher);
+ RamFileBlockCache cache(block_size, block_count * block_size, 0, fetcher);
std::vector<char> out;
out.resize(block_count, 0);
// The cache has space for `block_count` blocks. The loop with i = 0 should
@@ -146,7 +146,7 @@ TEST(FileBlockCacheTest, CacheHits) {
}
}
-TEST(FileBlockCacheTest, OutOfRange) {
+TEST(RamFileBlockCacheTest, OutOfRange) {
// Tests reads of a 24-byte file with block size 16.
const size_t block_size = 16;
const size_t file_size = 24;
@@ -172,7 +172,7 @@ TEST(FileBlockCacheTest, OutOfRange) {
*bytes_transferred = bytes_to_copy;
return Status::OK();
};
- FileBlockCache cache(block_size, block_size, 0, fetcher);
+ RamFileBlockCache cache(block_size, block_size, 0, fetcher);
std::vector<char> out;
// Reading the first 16 bytes should be fine.
TF_EXPECT_OK(ReadCache(&cache, "", 0, block_size, &out));
@@ -191,7 +191,7 @@ TEST(FileBlockCacheTest, OutOfRange) {
EXPECT_EQ(out.size(), file_size - block_size);
}
-TEST(FileBlockCacheTest, Inconsistent) {
+TEST(RamFileBlockCacheTest, Inconsistent) {
// Tests the detection of interrupted reads leading to partially filled blocks
// where we expected complete blocks.
const size_t block_size = 16;
@@ -205,7 +205,7 @@ TEST(FileBlockCacheTest, Inconsistent) {
*bytes_transferred = 1;
return Status::OK();
};
- FileBlockCache cache(block_size, 2 * block_size, 0, fetcher);
+ RamFileBlockCache cache(block_size, 2 * block_size, 0, fetcher);
std::vector<char> out;
// Read the second block; this should yield an OK status and a single byte.
TF_EXPECT_OK(ReadCache(&cache, "", block_size, block_size, &out));
@@ -216,7 +216,7 @@ TEST(FileBlockCacheTest, Inconsistent) {
EXPECT_EQ(status.code(), error::INTERNAL);
}
-TEST(FileBlockCacheTest, LRU) {
+TEST(RamFileBlockCacheTest, LRU) {
const size_t block_size = 16;
std::list<size_t> calls;
auto fetcher = [&calls, block_size](const string& filename, size_t offset,
@@ -233,7 +233,7 @@ TEST(FileBlockCacheTest, LRU) {
return Status::OK();
};
const uint32 block_count = 2;
- FileBlockCache cache(block_size, block_count * block_size, 0, fetcher);
+ RamFileBlockCache cache(block_size, block_count * block_size, 0, fetcher);
std::vector<char> out;
// Read blocks from the cache, and verify the LRU behavior based on the
// fetcher calls that the cache makes.
@@ -265,7 +265,7 @@ TEST(FileBlockCacheTest, LRU) {
TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out));
}
-TEST(FileBlockCacheTest, MaxStaleness) {
+TEST(RamFileBlockCacheTest, MaxStaleness) {
int calls = 0;
auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
char* buffer, size_t* bytes_transferred) {
@@ -278,7 +278,7 @@ TEST(FileBlockCacheTest, MaxStaleness) {
std::unique_ptr<NowSecondsEnv> env(new NowSecondsEnv);
// Create a cache with max staleness of 2 seconds, and verify that it works as
// expected.
- FileBlockCache cache1(8, 16, 2 /* max staleness */, fetcher, env.get());
+ RamFileBlockCache cache1(8, 16, 2 /* max staleness */, fetcher, env.get());
// Execute the first read to load the block.
TF_EXPECT_OK(ReadCache(&cache1, "", 0, 1, &out));
EXPECT_EQ(calls, 1);
@@ -294,7 +294,7 @@ TEST(FileBlockCacheTest, MaxStaleness) {
// as expected.
calls = 0;
env->SetNowSeconds(0);
- FileBlockCache cache2(8, 16, 0 /* max staleness */, fetcher, env.get());
+ RamFileBlockCache cache2(8, 16, 0 /* max staleness */, fetcher, env.get());
// Execute the first read to load the block.
TF_EXPECT_OK(ReadCache(&cache2, "", 0, 1, &out));
EXPECT_EQ(calls, 1);
@@ -305,7 +305,7 @@ TEST(FileBlockCacheTest, MaxStaleness) {
EXPECT_EQ(calls, 1);
}
-TEST(FileBlockCacheTest, RemoveFile) {
+TEST(RamFileBlockCacheTest, RemoveFile) {
int calls = 0;
auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
char* buffer, size_t* bytes_transferred) {
@@ -321,7 +321,7 @@ TEST(FileBlockCacheTest, RemoveFile) {
};
// This cache has space for 4 blocks; we'll read from two files.
const size_t n = 3;
- FileBlockCache cache(8, 32, 0, fetcher);
+ RamFileBlockCache cache(8, 32, 0, fetcher);
std::vector<char> out;
std::vector<char> a(n, 'a');
std::vector<char> b(n, 'b');
@@ -367,7 +367,7 @@ TEST(FileBlockCacheTest, RemoveFile) {
EXPECT_EQ(calls, 6);
}
-TEST(FileBlockCacheTest, Prune) {
+TEST(RamFileBlockCacheTest, Prune) {
int calls = 0;
auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
char* buffer, size_t* bytes_transferred) {
@@ -381,7 +381,7 @@ TEST(FileBlockCacheTest, Prune) {
std::unique_ptr<NowSecondsEnv> env(new NowSecondsEnv);
uint64 now = Env::Default()->NowSeconds();
env->SetNowSeconds(now);
- FileBlockCache cache(8, 32, 1 /* max staleness */, fetcher, env.get());
+ RamFileBlockCache cache(8, 32, 1 /* max staleness */, fetcher, env.get());
// Read three blocks into the cache, and advance the timestamp by one second
// with each read. Start with a block of "a" at the current timestamp `now`.
TF_EXPECT_OK(ReadCache(&cache, "a", 0, 1, &out));
@@ -426,7 +426,7 @@ TEST(FileBlockCacheTest, Prune) {
EXPECT_EQ(cache.CacheSize(), 0);
}
-TEST(FileBlockCacheTest, ParallelReads) {
+TEST(RamFileBlockCacheTest, ParallelReads) {
// This fetcher won't respond until either `callers` threads are calling it
// concurrently (at which point it will respond with success to all callers),
// or 10 seconds have elapsed (at which point it will respond with an error).
@@ -444,7 +444,7 @@ TEST(FileBlockCacheTest, ParallelReads) {
return Status::OK();
};
const int block_size = 8;
- FileBlockCache cache(block_size, 2 * callers * block_size, 0, fetcher);
+ RamFileBlockCache cache(block_size, 2 * callers * block_size, 0, fetcher);
std::vector<std::unique_ptr<Thread>> threads;
for (int i = 0; i < callers; i++) {
threads.emplace_back(
@@ -461,7 +461,7 @@ TEST(FileBlockCacheTest, ParallelReads) {
// executed, or 10 seconds have passed).
}
-TEST(FileBlockCacheTest, CoalesceConcurrentReads) {
+TEST(RamFileBlockCacheTest, CoalesceConcurrentReads) {
// Concurrent reads to the same file blocks should be de-duplicated.
const size_t block_size = 16;
int num_requests = 0;
@@ -479,7 +479,7 @@ TEST(FileBlockCacheTest, CoalesceConcurrentReads) {
Env::Default()->SleepForMicroseconds(100000); // 0.1 secs
return Status::OK();
};
- FileBlockCache cache(block_size, block_size, 0, fetcher);
+ RamFileBlockCache cache(block_size, block_size, 0, fetcher);
// Fork off thread for parallel read.
std::unique_ptr<Thread> concurrent(
Env::Default()->StartThread({}, "concurrent", [&cache, block_size] {
@@ -496,7 +496,7 @@ TEST(FileBlockCacheTest, CoalesceConcurrentReads) {
EXPECT_EQ(1, num_requests);
}
-TEST(FileBlockCacheTest, Flush) {
+TEST(RamFileBlockCacheTest, Flush) {
int calls = 0;
auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
char* buffer, size_t* bytes_transferred) {
@@ -505,7 +505,7 @@ TEST(FileBlockCacheTest, Flush) {
*bytes_transferred = n;
return Status::OK();
};
- FileBlockCache cache(16, 32, 0, fetcher);
+ RamFileBlockCache cache(16, 32, 0, fetcher);
std::vector<char> out;
TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out));
TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out));