diff options
author | Brennan Saeta <saeta@google.com> | 2018-03-02 12:19:23 -0800 |
---|---|---|
committer | TensorFlower Gardener <gardener@tensorflow.org> | 2018-03-02 12:23:04 -0800 |
commit | 41aa3e75ca35c763c23aeedf2409589b7814c7f1 (patch) | |
tree | a942b94371716b224107e779e3ad529d4936daba | |
parent | 2abc47106624e0102c917535dd6df45561550ade (diff) |
GCS: Extract block cache interface from implementation.
PiperOrigin-RevId: 187652953
-rw-r--r-- | tensorflow/core/platform/cloud/BUILD | 20 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/file_block_cache.h | 161 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/gcs_file_system.cc | 15 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/ram_file_block_cache.cc (renamed from tensorflow/core/platform/cloud/file_block_cache.cc) | 35 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/ram_file_block_cache.h | 229 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/ram_file_block_cache_test.cc (renamed from tensorflow/core/platform/cloud/file_block_cache_test.cc) | 60 |
6 files changed, 311 insertions, 209 deletions
diff --git a/tensorflow/core/platform/cloud/BUILD b/tensorflow/core/platform/cloud/BUILD index 9ba25dea4f..0a17a419d3 100644 --- a/tensorflow/core/platform/cloud/BUILD +++ b/tensorflow/core/platform/cloud/BUILD @@ -38,7 +38,6 @@ cc_library( cc_library( name = "file_block_cache", - srcs = ["file_block_cache.cc"], hdrs = ["file_block_cache.h"], copts = tf_copts(), visibility = ["//tensorflow:__subpackages__"], @@ -46,6 +45,18 @@ cc_library( ) cc_library( + name = "ram_file_block_cache", + srcs = ["ram_file_block_cache.cc"], + hdrs = ["ram_file_block_cache.h"], + copts = tf_copts(), + visibility = ["//tensorflow:__subpackages__"], + deps = [ + ":file_block_cache", + "//tensorflow/core:lib", + ], +) + +cc_library( name = "gcs_dns_cache", srcs = ["gcs_dns_cache.cc"], hdrs = ["gcs_dns_cache.h"], @@ -83,6 +94,7 @@ cc_library( ":gcs_throttle", ":google_auth_provider", ":http_request", + ":ram_file_block_cache", ":retrying_file_system", ":retrying_utils", ":time_util", @@ -245,12 +257,12 @@ tf_cc_test( ) tf_cc_test( - name = "file_block_cache_test", + name = "ram_file_block_cache_test", size = "small", - srcs = ["file_block_cache_test.cc"], + srcs = ["ram_file_block_cache_test.cc"], deps = [ - ":file_block_cache", ":now_seconds_env", + ":ram_file_block_cache", "//tensorflow/core:lib", "//tensorflow/core:lib_internal", "//tensorflow/core:test", diff --git a/tensorflow/core/platform/cloud/file_block_cache.h b/tensorflow/core/platform/cloud/file_block_cache.h index 5c180e2332..da16788247 100644 --- a/tensorflow/core/platform/cloud/file_block_cache.h +++ b/tensorflow/core/platform/cloud/file_block_cache.h @@ -32,7 +32,7 @@ limitations under the License. namespace tensorflow { -/// \brief An LRU block cache of file contents, keyed by {filename, offset}. +/// \brief A block cache of file contents, keyed by {filename, offset}. /// /// This class should be shared by read-only random access files on a remote /// filesystem (e.g. GCS). @@ -48,27 +48,7 @@ class FileBlockCache { size_t* bytes_transferred)> BlockFetcher; - FileBlockCache(size_t block_size, size_t max_bytes, uint64 max_staleness, - BlockFetcher block_fetcher, Env* env = Env::Default()) - : block_size_(block_size), - max_bytes_(max_bytes), - max_staleness_(max_staleness), - block_fetcher_(block_fetcher), - env_(env) { - if (max_staleness_ > 0) { - pruning_thread_.reset(env_->StartThread(ThreadOptions(), "TF_prune_FBC", - [this] { Prune(); })); - } - } - - ~FileBlockCache() { - if (pruning_thread_) { - stop_pruning_thread_.Notify(); - // Destroying pruning_thread_ will block until Prune() receives the above - // notification and returns. - pruning_thread_.reset(); - } - } + virtual ~FileBlockCache() {} /// Read `n` bytes from `filename` starting at `offset` into `out`. This /// method will return: @@ -84,143 +64,22 @@ class FileBlockCache { /// placed in `out`. /// 4) OK otherwise (i.e. the read succeeded, and at least one byte was placed /// in `out`). - Status Read(const string& filename, size_t offset, size_t n, char* buffer, - size_t* bytes_transferred); + virtual Status Read(const string& filename, size_t offset, size_t n, + char* buffer, size_t* bytes_transferred) = 0; /// Remove all cached blocks for `filename`. - void RemoveFile(const string& filename) LOCKS_EXCLUDED(mu_); + virtual void RemoveFile(const string& filename) = 0; /// Remove all cached data. - void Flush() LOCKS_EXCLUDED(mu_); + virtual void Flush() = 0; /// Accessors for cache parameters. - size_t block_size() const { return block_size_; } - size_t max_bytes() const { return max_bytes_; } - uint64 max_staleness() const { return max_staleness_; } + virtual size_t block_size() const = 0; + virtual size_t max_bytes() const = 0; + virtual uint64 max_staleness() const = 0; /// The current size (in bytes) of the cache. - size_t CacheSize() const LOCKS_EXCLUDED(mu_); - - private: - /// The size of the blocks stored in the LRU cache, as well as the size of the - /// reads from the underlying filesystem. - const size_t block_size_; - /// The maximum number of bytes (sum of block sizes) allowed in the LRU cache. - const size_t max_bytes_; - /// The maximum staleness of any block in the LRU cache, in seconds. - const uint64 max_staleness_; - /// The callback to read a block from the underlying filesystem. - const BlockFetcher block_fetcher_; - /// The Env from which we read timestamps. - Env* const env_; // not owned - - /// \brief The key type for the file block cache. - /// - /// The file block cache key is a {filename, offset} pair. - typedef std::pair<string, size_t> Key; - - /// \brief The state of a block. - /// - /// A block begins in the CREATED stage. The first thread will attempt to read - /// the block from the filesystem, transitioning the state of the block to - /// FETCHING. After completing, if the read was successful the state should - /// be FINISHED. Otherwise the state should be ERROR. A subsequent read can - /// re-fetch the block if the state is ERROR. - enum class FetchState { - CREATED, - FETCHING, - FINISHED, - ERROR, - }; - - /// \brief A block of a file. - /// - /// A file block consists of the block data, the block's current position in - /// the LRU cache, the timestamp (seconds since epoch) at which the block - /// was cached, a coordination lock, and state & condition variables. - /// - /// Thread safety: - /// The iterator and timestamp fields should only be accessed while holding - /// the block-cache-wide mu_ instance variable. The state variable should only - /// be accessed while holding the Block's mu lock. The data vector should only - /// be accessed after state == FINISHED, and it should never be modified. - /// - /// In order to prevent deadlocks, never grab the block-cache-wide mu_ lock - /// AFTER grabbing any block's mu lock. It is safe to grab mu without locking - /// mu_. - struct Block { - /// The block data. - std::vector<char> data; - /// A list iterator pointing to the block's position in the LRU list. - std::list<Key>::iterator lru_iterator; - /// A list iterator pointing to the block's position in the LRA list. - std::list<Key>::iterator lra_iterator; - /// The timestamp (seconds since epoch) at which the block was cached. - uint64 timestamp; - /// Mutex to guard state variable - mutex mu; - /// The state of the block. - FetchState state GUARDED_BY(mu) = FetchState::CREATED; - /// Wait on cond_var if state is FETCHING. - condition_variable cond_var; - }; - - /// \brief The block map type for the file block cache. - /// - /// The block map is an ordered map from Key to Block. - typedef std::map<Key, std::shared_ptr<Block>> BlockMap; - - /// Prune the cache by removing files with expired blocks. - void Prune() LOCKS_EXCLUDED(mu_); - - bool BlockNotStale(const std::shared_ptr<Block>& block) - EXCLUSIVE_LOCKS_REQUIRED(mu_); - - /// Look up a Key in the block cache. - std::shared_ptr<Block> Lookup(const Key& key) LOCKS_EXCLUDED(mu_); - - Status MaybeFetch(const Key& key, const std::shared_ptr<Block>& block) - LOCKS_EXCLUDED(mu_); - - /// Trim the block cache to make room for another entry. - void Trim() EXCLUSIVE_LOCKS_REQUIRED(mu_); - - /// Update the LRU iterator for the block at `key`. - Status UpdateLRU(const Key& key, const std::shared_ptr<Block>& block) - LOCKS_EXCLUDED(mu_); - - /// Remove all blocks of a file, with mu_ already held. - void RemoveFile_Locked(const string& filename) EXCLUSIVE_LOCKS_REQUIRED(mu_); - - /// Remove the block `entry` from the block map and LRU list, and update the - /// cache size accordingly. - void RemoveBlock(BlockMap::iterator entry) EXCLUSIVE_LOCKS_REQUIRED(mu_); - - /// The cache pruning thread that removes files with expired blocks. - std::unique_ptr<Thread> pruning_thread_; - - /// Notification for stopping the cache pruning thread. - Notification stop_pruning_thread_; - - /// Guards access to the block map, LRU list, and cached byte count. - mutable mutex mu_; - - /// The block map (map from Key to Block). - BlockMap block_map_ GUARDED_BY(mu_); - - /// The LRU list of block keys. The front of the list identifies the most - /// recently accessed block. - std::list<Key> lru_list_ GUARDED_BY(mu_); - - /// The LRA (least recently added) list of block keys. The front of the list - /// identifies the most recently added block. - /// - /// Note: blocks are added to lra_list_ only after they have successfully been - /// fetched from the underlying block store. - std::list<Key> lra_list_ GUARDED_BY(mu_); - - /// The combined number of bytes in all of the cached blocks. - size_t cache_size_ GUARDED_BY(mu_) = 0; + virtual size_t CacheSize() const = 0; }; } // namespace tensorflow diff --git a/tensorflow/core/platform/cloud/gcs_file_system.cc b/tensorflow/core/platform/cloud/gcs_file_system.cc index 01ca0d76ba..84b65cec4f 100644 --- a/tensorflow/core/platform/cloud/gcs_file_system.cc +++ b/tensorflow/core/platform/cloud/gcs_file_system.cc @@ -36,6 +36,7 @@ limitations under the License. #include "tensorflow/core/platform/cloud/curl_http_request.h" #include "tensorflow/core/platform/cloud/file_block_cache.h" #include "tensorflow/core/platform/cloud/google_auth_provider.h" +#include "tensorflow/core/platform/cloud/ram_file_block_cache.h" #include "tensorflow/core/platform/cloud/retrying_utils.h" #include "tensorflow/core/platform/cloud/time_util.h" #include "tensorflow/core/platform/env.h" @@ -783,13 +784,13 @@ Status GcsFileSystem::NewRandomAccessFile( // A helper function to build a FileBlockCache for GcsFileSystem. std::unique_ptr<FileBlockCache> GcsFileSystem::MakeFileBlockCache( size_t block_size, size_t max_bytes, uint64 max_staleness) { - std::unique_ptr<FileBlockCache> file_block_cache( - new FileBlockCache(block_size, max_bytes, max_staleness, - [this](const string& filename, size_t offset, size_t n, - char* buffer, size_t* bytes_transferred) { - return LoadBufferFromGCS(filename, offset, n, buffer, - bytes_transferred); - })); + std::unique_ptr<FileBlockCache> file_block_cache(new RamFileBlockCache( + block_size, max_bytes, max_staleness, + [this](const string& filename, size_t offset, size_t n, char* buffer, + size_t* bytes_transferred) { + return LoadBufferFromGCS(filename, offset, n, buffer, + bytes_transferred); + })); return file_block_cache; } diff --git a/tensorflow/core/platform/cloud/file_block_cache.cc b/tensorflow/core/platform/cloud/ram_file_block_cache.cc index 6add1142a1..55a5657a50 100644 --- a/tensorflow/core/platform/cloud/file_block_cache.cc +++ b/tensorflow/core/platform/cloud/ram_file_block_cache.cc @@ -13,7 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. ==============================================================================*/ -#include "tensorflow/core/platform/cloud/file_block_cache.h" +#include "tensorflow/core/platform/cloud/ram_file_block_cache.h" #include <cstring> #include <memory> #include "tensorflow/core/lib/gtl/cleanup.h" @@ -21,7 +21,7 @@ limitations under the License. namespace tensorflow { -bool FileBlockCache::BlockNotStale(const std::shared_ptr<Block>& block) { +bool RamFileBlockCache::BlockNotStale(const std::shared_ptr<Block>& block) { mutex_lock l(block->mu); if (block->state != FetchState::FINISHED) { return true; // No need to check for staleness. @@ -30,7 +30,8 @@ bool FileBlockCache::BlockNotStale(const std::shared_ptr<Block>& block) { return env_->NowSeconds() - block->timestamp <= max_staleness_; } -std::shared_ptr<FileBlockCache::Block> FileBlockCache::Lookup(const Key& key) { +std::shared_ptr<RamFileBlockCache::Block> RamFileBlockCache::Lookup( + const Key& key) { mutex_lock lock(mu_); auto entry = block_map_.find(key); if (entry != block_map_.end()) { @@ -55,15 +56,15 @@ std::shared_ptr<FileBlockCache::Block> FileBlockCache::Lookup(const Key& key) { } // Remove blocks from the cache until we do not exceed our maximum size. -void FileBlockCache::Trim() { +void RamFileBlockCache::Trim() { while (!lru_list_.empty() && cache_size_ > max_bytes_) { RemoveBlock(block_map_.find(lru_list_.back())); } } /// Move the block to the front of the LRU list if it isn't already there. -Status FileBlockCache::UpdateLRU(const Key& key, - const std::shared_ptr<Block>& block) { +Status RamFileBlockCache::UpdateLRU(const Key& key, + const std::shared_ptr<Block>& block) { mutex_lock lock(mu_); if (block->timestamp == 0) { // The block was evicted from another thread. Allow it to remain evicted. @@ -92,8 +93,8 @@ Status FileBlockCache::UpdateLRU(const Key& key, return Status::OK(); } -Status FileBlockCache::MaybeFetch(const Key& key, - const std::shared_ptr<Block>& block) { +Status RamFileBlockCache::MaybeFetch(const Key& key, + const std::shared_ptr<Block>& block) { bool downloaded_block = false; auto reconcile_state = gtl::MakeCleanup([this, &downloaded_block, &key, &block] { @@ -151,11 +152,11 @@ Status FileBlockCache::MaybeFetch(const Key& key, } } return errors::Internal( - "Control flow should never reach the end of FileBlockCache::Fetch."); + "Control flow should never reach the end of RamFileBlockCache::Fetch."); } -Status FileBlockCache::Read(const string& filename, size_t offset, size_t n, - char* buffer, size_t* bytes_transferred) { +Status RamFileBlockCache::Read(const string& filename, size_t offset, size_t n, + char* buffer, size_t* bytes_transferred) { *bytes_transferred = 0; if (n == 0) { return Status::OK(); @@ -216,12 +217,12 @@ Status FileBlockCache::Read(const string& filename, size_t offset, size_t n, return Status::OK(); } -size_t FileBlockCache::CacheSize() const { +size_t RamFileBlockCache::CacheSize() const { mutex_lock lock(mu_); return cache_size_; } -void FileBlockCache::Prune() { +void RamFileBlockCache::Prune() { while (!WaitForNotificationWithTimeout(&stop_pruning_thread_, 1000000)) { mutex_lock lock(mu_); uint64 now = env_->NowSeconds(); @@ -238,7 +239,7 @@ void FileBlockCache::Prune() { } } -void FileBlockCache::Flush() { +void RamFileBlockCache::Flush() { mutex_lock lock(mu_); block_map_.clear(); lru_list_.clear(); @@ -246,12 +247,12 @@ void FileBlockCache::Flush() { cache_size_ = 0; } -void FileBlockCache::RemoveFile(const string& filename) { +void RamFileBlockCache::RemoveFile(const string& filename) { mutex_lock lock(mu_); RemoveFile_Locked(filename); } -void FileBlockCache::RemoveFile_Locked(const string& filename) { +void RamFileBlockCache::RemoveFile_Locked(const string& filename) { Key begin = std::make_pair(filename, 0); auto it = block_map_.lower_bound(begin); while (it != block_map_.end() && it->first.first == filename) { @@ -261,7 +262,7 @@ void FileBlockCache::RemoveFile_Locked(const string& filename) { } } -void FileBlockCache::RemoveBlock(BlockMap::iterator entry) { +void RamFileBlockCache::RemoveBlock(BlockMap::iterator entry) { // This signals that the block is removed, and should not be inadvertently // reinserted into the cache in UpdateLRU. entry->second->timestamp = 0; diff --git a/tensorflow/core/platform/cloud/ram_file_block_cache.h b/tensorflow/core/platform/cloud/ram_file_block_cache.h new file mode 100644 index 0000000000..7fdd7b2e02 --- /dev/null +++ b/tensorflow/core/platform/cloud/ram_file_block_cache.h @@ -0,0 +1,229 @@ +/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_ +#define TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_ + +#include <functional> +#include <list> +#include <map> +#include <memory> +#include <string> +#include <vector> +#include "tensorflow/core/lib/core/status.h" +#include "tensorflow/core/lib/core/stringpiece.h" +#include "tensorflow/core/platform/cloud/file_block_cache.h" +#include "tensorflow/core/platform/env.h" +#include "tensorflow/core/platform/mutex.h" +#include "tensorflow/core/platform/notification.h" +#include "tensorflow/core/platform/thread_annotations.h" +#include "tensorflow/core/platform/types.h" + +namespace tensorflow { + +/// \brief An LRU block cache of file contents, keyed by {filename, offset}. +/// +/// This class should be shared by read-only random access files on a remote +/// filesystem (e.g. GCS). +class RamFileBlockCache : public FileBlockCache { + public: + /// The callback executed when a block is not found in the cache, and needs to + /// be fetched from the backing filesystem. This callback is provided when the + /// cache is constructed. The returned Status should be OK as long as the + /// read from the remote filesystem succeeded (similar to the semantics of the + /// read(2) system call). + typedef std::function<Status(const string& filename, size_t offset, + size_t buffer_size, char* buffer, + size_t* bytes_transferred)> + BlockFetcher; + + RamFileBlockCache(size_t block_size, size_t max_bytes, uint64 max_staleness, + BlockFetcher block_fetcher, Env* env = Env::Default()) + : block_size_(block_size), + max_bytes_(max_bytes), + max_staleness_(max_staleness), + block_fetcher_(block_fetcher), + env_(env) { + if (max_staleness_ > 0) { + pruning_thread_.reset(env_->StartThread(ThreadOptions(), "TF_prune_FBC", + [this] { Prune(); })); + } + } + + ~RamFileBlockCache() override { + if (pruning_thread_) { + stop_pruning_thread_.Notify(); + // Destroying pruning_thread_ will block until Prune() receives the above + // notification and returns. + pruning_thread_.reset(); + } + } + + /// Read `n` bytes from `filename` starting at `offset` into `out`. This + /// method will return: + /// + /// 1) The error from the remote filesystem, if the read from the remote + /// filesystem failed. + /// 2) PRECONDITION_FAILED if the read from the remote filesystem succeeded, + /// but the read returned a partial block, and the LRU cache contained a + /// block at a higher offset (indicating that the partial block should have + /// been a full block). + /// 3) OUT_OF_RANGE if the read from the remote filesystem succeeded, but + /// the file contents do not extend past `offset` and thus nothing was + /// placed in `out`. + /// 4) OK otherwise (i.e. the read succeeded, and at least one byte was placed + /// in `out`). + Status Read(const string& filename, size_t offset, size_t n, char* buffer, + size_t* bytes_transferred) override; + + /// Remove all cached blocks for `filename`. + void RemoveFile(const string& filename) override LOCKS_EXCLUDED(mu_); + + /// Remove all cached data. + void Flush() LOCKS_EXCLUDED(mu_) override; + + /// Accessors for cache parameters. + size_t block_size() const override { return block_size_; } + size_t max_bytes() const override { return max_bytes_; } + uint64 max_staleness() const override { return max_staleness_; } + + /// The current size (in bytes) of the cache. + size_t CacheSize() const override LOCKS_EXCLUDED(mu_); + + private: + /// The size of the blocks stored in the LRU cache, as well as the size of the + /// reads from the underlying filesystem. + const size_t block_size_; + /// The maximum number of bytes (sum of block sizes) allowed in the LRU cache. + const size_t max_bytes_; + /// The maximum staleness of any block in the LRU cache, in seconds. + const uint64 max_staleness_; + /// The callback to read a block from the underlying filesystem. + const BlockFetcher block_fetcher_; + /// The Env from which we read timestamps. + Env* const env_; // not owned + + /// \brief The key type for the file block cache. + /// + /// The file block cache key is a {filename, offset} pair. + typedef std::pair<string, size_t> Key; + + /// \brief The state of a block. + /// + /// A block begins in the CREATED stage. The first thread will attempt to read + /// the block from the filesystem, transitioning the state of the block to + /// FETCHING. After completing, if the read was successful the state should + /// be FINISHED. Otherwise the state should be ERROR. A subsequent read can + /// re-fetch the block if the state is ERROR. + enum class FetchState { + CREATED, + FETCHING, + FINISHED, + ERROR, + }; + + /// \brief A block of a file. + /// + /// A file block consists of the block data, the block's current position in + /// the LRU cache, the timestamp (seconds since epoch) at which the block + /// was cached, a coordination lock, and state & condition variables. + /// + /// Thread safety: + /// The iterator and timestamp fields should only be accessed while holding + /// the block-cache-wide mu_ instance variable. The state variable should only + /// be accessed while holding the Block's mu lock. The data vector should only + /// be accessed after state == FINISHED, and it should never be modified. + /// + /// In order to prevent deadlocks, never grab the block-cache-wide mu_ lock + /// AFTER grabbing any block's mu lock. It is safe to grab mu without locking + /// mu_. + struct Block { + /// The block data. + std::vector<char> data; + /// A list iterator pointing to the block's position in the LRU list. + std::list<Key>::iterator lru_iterator; + /// A list iterator pointing to the block's position in the LRA list. + std::list<Key>::iterator lra_iterator; + /// The timestamp (seconds since epoch) at which the block was cached. + uint64 timestamp; + /// Mutex to guard state variable + mutex mu; + /// The state of the block. + FetchState state GUARDED_BY(mu) = FetchState::CREATED; + /// Wait on cond_var if state is FETCHING. + condition_variable cond_var; + }; + + /// \brief The block map type for the file block cache. + /// + /// The block map is an ordered map from Key to Block. + typedef std::map<Key, std::shared_ptr<Block>> BlockMap; + + /// Prune the cache by removing files with expired blocks. + void Prune() LOCKS_EXCLUDED(mu_); + + bool BlockNotStale(const std::shared_ptr<Block>& block) + EXCLUSIVE_LOCKS_REQUIRED(mu_); + + /// Look up a Key in the block cache. + std::shared_ptr<Block> Lookup(const Key& key) LOCKS_EXCLUDED(mu_); + + Status MaybeFetch(const Key& key, const std::shared_ptr<Block>& block) + LOCKS_EXCLUDED(mu_); + + /// Trim the block cache to make room for another entry. + void Trim() EXCLUSIVE_LOCKS_REQUIRED(mu_); + + /// Update the LRU iterator for the block at `key`. + Status UpdateLRU(const Key& key, const std::shared_ptr<Block>& block) + LOCKS_EXCLUDED(mu_); + + /// Remove all blocks of a file, with mu_ already held. + void RemoveFile_Locked(const string& filename) EXCLUSIVE_LOCKS_REQUIRED(mu_); + + /// Remove the block `entry` from the block map and LRU list, and update the + /// cache size accordingly. + void RemoveBlock(BlockMap::iterator entry) EXCLUSIVE_LOCKS_REQUIRED(mu_); + + /// The cache pruning thread that removes files with expired blocks. + std::unique_ptr<Thread> pruning_thread_; + + /// Notification for stopping the cache pruning thread. + Notification stop_pruning_thread_; + + /// Guards access to the block map, LRU list, and cached byte count. + mutable mutex mu_; + + /// The block map (map from Key to Block). + BlockMap block_map_ GUARDED_BY(mu_); + + /// The LRU list of block keys. The front of the list identifies the most + /// recently accessed block. + std::list<Key> lru_list_ GUARDED_BY(mu_); + + /// The LRA (least recently added) list of block keys. The front of the list + /// identifies the most recently added block. + /// + /// Note: blocks are added to lra_list_ only after they have successfully been + /// fetched from the underlying block store. + std::list<Key> lra_list_ GUARDED_BY(mu_); + + /// The combined number of bytes in all of the cached blocks. + size_t cache_size_ GUARDED_BY(mu_) = 0; +}; + +} // namespace tensorflow + +#endif // TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_ diff --git a/tensorflow/core/platform/cloud/file_block_cache_test.cc b/tensorflow/core/platform/cloud/ram_file_block_cache_test.cc index 596fdbf19e..d555b682a6 100644 --- a/tensorflow/core/platform/cloud/file_block_cache_test.cc +++ b/tensorflow/core/platform/cloud/ram_file_block_cache_test.cc @@ -13,7 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. ==============================================================================*/ -#include "tensorflow/core/platform/cloud/file_block_cache.h" +#include "tensorflow/core/platform/cloud/ram_file_block_cache.h" #include <cstring> #include "tensorflow/core/lib/core/blocking_counter.h" #include "tensorflow/core/lib/core/status_test_util.h" @@ -25,8 +25,8 @@ limitations under the License. namespace tensorflow { namespace { -Status ReadCache(FileBlockCache* cache, const string& filename, size_t offset, - size_t n, std::vector<char>* out) { +Status ReadCache(RamFileBlockCache* cache, const string& filename, + size_t offset, size_t n, std::vector<char>* out) { out->clear(); out->resize(n, 0); size_t bytes_transferred = 0; @@ -37,7 +37,7 @@ Status ReadCache(FileBlockCache* cache, const string& filename, size_t offset, return status; } -TEST(FileBlockCacheTest, PassThrough) { +TEST(RamFileBlockCacheTest, PassThrough) { const string want_filename = "foo/bar"; const size_t want_offset = 42; const size_t want_n = 1024; @@ -54,9 +54,9 @@ TEST(FileBlockCacheTest, PassThrough) { return Status::OK(); }; // If block_size, max_bytes, or both are zero, the cache is a pass-through. - FileBlockCache cache1(1, 0, 0, fetcher); - FileBlockCache cache2(0, 1, 0, fetcher); - FileBlockCache cache3(0, 0, 0, fetcher); + RamFileBlockCache cache1(1, 0, 0, fetcher); + RamFileBlockCache cache2(0, 1, 0, fetcher); + RamFileBlockCache cache3(0, 0, 0, fetcher); std::vector<char> out; TF_EXPECT_OK(ReadCache(&cache1, want_filename, want_offset, want_n, &out)); EXPECT_EQ(calls, 1); @@ -66,7 +66,7 @@ TEST(FileBlockCacheTest, PassThrough) { EXPECT_EQ(calls, 3); } -TEST(FileBlockCacheTest, BlockAlignment) { +TEST(RamFileBlockCacheTest, BlockAlignment) { // Initialize a 256-byte buffer. This is the file underlying the reads we'll // do in this test. const size_t size = 256; @@ -89,7 +89,7 @@ TEST(FileBlockCacheTest, BlockAlignment) { for (size_t block_size = 2; block_size <= 4; block_size++) { // Make a cache of N-byte block size (1 block) and verify that reads of // varying offsets and lengths return correct data. - FileBlockCache cache(block_size, block_size, 0, fetcher); + RamFileBlockCache cache(block_size, block_size, 0, fetcher); for (size_t offset = 0; offset < 10; offset++) { for (size_t n = block_size - 2; n <= block_size + 2; n++) { std::vector<char> got; @@ -117,7 +117,7 @@ TEST(FileBlockCacheTest, BlockAlignment) { } } -TEST(FileBlockCacheTest, CacheHits) { +TEST(RamFileBlockCacheTest, CacheHits) { const size_t block_size = 16; std::set<size_t> calls; auto fetcher = [&calls, block_size](const string& filename, size_t offset, @@ -132,7 +132,7 @@ TEST(FileBlockCacheTest, CacheHits) { return Status::OK(); }; const uint32 block_count = 256; - FileBlockCache cache(block_size, block_count * block_size, 0, fetcher); + RamFileBlockCache cache(block_size, block_count * block_size, 0, fetcher); std::vector<char> out; out.resize(block_count, 0); // The cache has space for `block_count` blocks. The loop with i = 0 should @@ -146,7 +146,7 @@ TEST(FileBlockCacheTest, CacheHits) { } } -TEST(FileBlockCacheTest, OutOfRange) { +TEST(RamFileBlockCacheTest, OutOfRange) { // Tests reads of a 24-byte file with block size 16. const size_t block_size = 16; const size_t file_size = 24; @@ -172,7 +172,7 @@ TEST(FileBlockCacheTest, OutOfRange) { *bytes_transferred = bytes_to_copy; return Status::OK(); }; - FileBlockCache cache(block_size, block_size, 0, fetcher); + RamFileBlockCache cache(block_size, block_size, 0, fetcher); std::vector<char> out; // Reading the first 16 bytes should be fine. TF_EXPECT_OK(ReadCache(&cache, "", 0, block_size, &out)); @@ -191,7 +191,7 @@ TEST(FileBlockCacheTest, OutOfRange) { EXPECT_EQ(out.size(), file_size - block_size); } -TEST(FileBlockCacheTest, Inconsistent) { +TEST(RamFileBlockCacheTest, Inconsistent) { // Tests the detection of interrupted reads leading to partially filled blocks // where we expected complete blocks. const size_t block_size = 16; @@ -205,7 +205,7 @@ TEST(FileBlockCacheTest, Inconsistent) { *bytes_transferred = 1; return Status::OK(); }; - FileBlockCache cache(block_size, 2 * block_size, 0, fetcher); + RamFileBlockCache cache(block_size, 2 * block_size, 0, fetcher); std::vector<char> out; // Read the second block; this should yield an OK status and a single byte. TF_EXPECT_OK(ReadCache(&cache, "", block_size, block_size, &out)); @@ -216,7 +216,7 @@ TEST(FileBlockCacheTest, Inconsistent) { EXPECT_EQ(status.code(), error::INTERNAL); } -TEST(FileBlockCacheTest, LRU) { +TEST(RamFileBlockCacheTest, LRU) { const size_t block_size = 16; std::list<size_t> calls; auto fetcher = [&calls, block_size](const string& filename, size_t offset, @@ -233,7 +233,7 @@ TEST(FileBlockCacheTest, LRU) { return Status::OK(); }; const uint32 block_count = 2; - FileBlockCache cache(block_size, block_count * block_size, 0, fetcher); + RamFileBlockCache cache(block_size, block_count * block_size, 0, fetcher); std::vector<char> out; // Read blocks from the cache, and verify the LRU behavior based on the // fetcher calls that the cache makes. @@ -265,7 +265,7 @@ TEST(FileBlockCacheTest, LRU) { TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out)); } -TEST(FileBlockCacheTest, MaxStaleness) { +TEST(RamFileBlockCacheTest, MaxStaleness) { int calls = 0; auto fetcher = [&calls](const string& filename, size_t offset, size_t n, char* buffer, size_t* bytes_transferred) { @@ -278,7 +278,7 @@ TEST(FileBlockCacheTest, MaxStaleness) { std::unique_ptr<NowSecondsEnv> env(new NowSecondsEnv); // Create a cache with max staleness of 2 seconds, and verify that it works as // expected. - FileBlockCache cache1(8, 16, 2 /* max staleness */, fetcher, env.get()); + RamFileBlockCache cache1(8, 16, 2 /* max staleness */, fetcher, env.get()); // Execute the first read to load the block. TF_EXPECT_OK(ReadCache(&cache1, "", 0, 1, &out)); EXPECT_EQ(calls, 1); @@ -294,7 +294,7 @@ TEST(FileBlockCacheTest, MaxStaleness) { // as expected. calls = 0; env->SetNowSeconds(0); - FileBlockCache cache2(8, 16, 0 /* max staleness */, fetcher, env.get()); + RamFileBlockCache cache2(8, 16, 0 /* max staleness */, fetcher, env.get()); // Execute the first read to load the block. TF_EXPECT_OK(ReadCache(&cache2, "", 0, 1, &out)); EXPECT_EQ(calls, 1); @@ -305,7 +305,7 @@ TEST(FileBlockCacheTest, MaxStaleness) { EXPECT_EQ(calls, 1); } -TEST(FileBlockCacheTest, RemoveFile) { +TEST(RamFileBlockCacheTest, RemoveFile) { int calls = 0; auto fetcher = [&calls](const string& filename, size_t offset, size_t n, char* buffer, size_t* bytes_transferred) { @@ -321,7 +321,7 @@ TEST(FileBlockCacheTest, RemoveFile) { }; // This cache has space for 4 blocks; we'll read from two files. const size_t n = 3; - FileBlockCache cache(8, 32, 0, fetcher); + RamFileBlockCache cache(8, 32, 0, fetcher); std::vector<char> out; std::vector<char> a(n, 'a'); std::vector<char> b(n, 'b'); @@ -367,7 +367,7 @@ TEST(FileBlockCacheTest, RemoveFile) { EXPECT_EQ(calls, 6); } -TEST(FileBlockCacheTest, Prune) { +TEST(RamFileBlockCacheTest, Prune) { int calls = 0; auto fetcher = [&calls](const string& filename, size_t offset, size_t n, char* buffer, size_t* bytes_transferred) { @@ -381,7 +381,7 @@ TEST(FileBlockCacheTest, Prune) { std::unique_ptr<NowSecondsEnv> env(new NowSecondsEnv); uint64 now = Env::Default()->NowSeconds(); env->SetNowSeconds(now); - FileBlockCache cache(8, 32, 1 /* max staleness */, fetcher, env.get()); + RamFileBlockCache cache(8, 32, 1 /* max staleness */, fetcher, env.get()); // Read three blocks into the cache, and advance the timestamp by one second // with each read. Start with a block of "a" at the current timestamp `now`. TF_EXPECT_OK(ReadCache(&cache, "a", 0, 1, &out)); @@ -426,7 +426,7 @@ TEST(FileBlockCacheTest, Prune) { EXPECT_EQ(cache.CacheSize(), 0); } -TEST(FileBlockCacheTest, ParallelReads) { +TEST(RamFileBlockCacheTest, ParallelReads) { // This fetcher won't respond until either `callers` threads are calling it // concurrently (at which point it will respond with success to all callers), // or 10 seconds have elapsed (at which point it will respond with an error). @@ -444,7 +444,7 @@ TEST(FileBlockCacheTest, ParallelReads) { return Status::OK(); }; const int block_size = 8; - FileBlockCache cache(block_size, 2 * callers * block_size, 0, fetcher); + RamFileBlockCache cache(block_size, 2 * callers * block_size, 0, fetcher); std::vector<std::unique_ptr<Thread>> threads; for (int i = 0; i < callers; i++) { threads.emplace_back( @@ -461,7 +461,7 @@ TEST(FileBlockCacheTest, ParallelReads) { // executed, or 10 seconds have passed). } -TEST(FileBlockCacheTest, CoalesceConcurrentReads) { +TEST(RamFileBlockCacheTest, CoalesceConcurrentReads) { // Concurrent reads to the same file blocks should be de-duplicated. const size_t block_size = 16; int num_requests = 0; @@ -479,7 +479,7 @@ TEST(FileBlockCacheTest, CoalesceConcurrentReads) { Env::Default()->SleepForMicroseconds(100000); // 0.1 secs return Status::OK(); }; - FileBlockCache cache(block_size, block_size, 0, fetcher); + RamFileBlockCache cache(block_size, block_size, 0, fetcher); // Fork off thread for parallel read. std::unique_ptr<Thread> concurrent( Env::Default()->StartThread({}, "concurrent", [&cache, block_size] { @@ -496,7 +496,7 @@ TEST(FileBlockCacheTest, CoalesceConcurrentReads) { EXPECT_EQ(1, num_requests); } -TEST(FileBlockCacheTest, Flush) { +TEST(RamFileBlockCacheTest, Flush) { int calls = 0; auto fetcher = [&calls](const string& filename, size_t offset, size_t n, char* buffer, size_t* bytes_transferred) { @@ -505,7 +505,7 @@ TEST(FileBlockCacheTest, Flush) { *bytes_transferred = n; return Status::OK(); }; - FileBlockCache cache(16, 32, 0, fetcher); + RamFileBlockCache cache(16, 32, 0, fetcher); std::vector<char> out; TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out)); TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out)); |