From 41aa3e75ca35c763c23aeedf2409589b7814c7f1 Mon Sep 17 00:00:00 2001 From: Brennan Saeta Date: Fri, 2 Mar 2018 12:19:23 -0800 Subject: GCS: Extract block cache interface from implementation. PiperOrigin-RevId: 187652953 --- tensorflow/core/platform/cloud/BUILD | 20 +- tensorflow/core/platform/cloud/file_block_cache.cc | 274 ----------- tensorflow/core/platform/cloud/file_block_cache.h | 161 +------ .../core/platform/cloud/file_block_cache_test.cc | 519 --------------------- tensorflow/core/platform/cloud/gcs_file_system.cc | 15 +- .../core/platform/cloud/ram_file_block_cache.cc | 275 +++++++++++ .../core/platform/cloud/ram_file_block_cache.h | 229 +++++++++ .../platform/cloud/ram_file_block_cache_test.cc | 519 +++++++++++++++++++++ 8 files changed, 1057 insertions(+), 955 deletions(-) delete mode 100644 tensorflow/core/platform/cloud/file_block_cache.cc delete mode 100644 tensorflow/core/platform/cloud/file_block_cache_test.cc create mode 100644 tensorflow/core/platform/cloud/ram_file_block_cache.cc create mode 100644 tensorflow/core/platform/cloud/ram_file_block_cache.h create mode 100644 tensorflow/core/platform/cloud/ram_file_block_cache_test.cc diff --git a/tensorflow/core/platform/cloud/BUILD b/tensorflow/core/platform/cloud/BUILD index 9ba25dea4f..0a17a419d3 100644 --- a/tensorflow/core/platform/cloud/BUILD +++ b/tensorflow/core/platform/cloud/BUILD @@ -38,13 +38,24 @@ cc_library( cc_library( name = "file_block_cache", - srcs = ["file_block_cache.cc"], hdrs = ["file_block_cache.h"], copts = tf_copts(), visibility = ["//tensorflow:__subpackages__"], deps = ["//tensorflow/core:lib"], ) +cc_library( + name = "ram_file_block_cache", + srcs = ["ram_file_block_cache.cc"], + hdrs = ["ram_file_block_cache.h"], + copts = tf_copts(), + visibility = ["//tensorflow:__subpackages__"], + deps = [ + ":file_block_cache", + "//tensorflow/core:lib", + ], +) + cc_library( name = "gcs_dns_cache", srcs = ["gcs_dns_cache.cc"], @@ -83,6 +94,7 @@ cc_library( ":gcs_throttle", ":google_auth_provider", ":http_request", + ":ram_file_block_cache", ":retrying_file_system", ":retrying_utils", ":time_util", @@ -245,12 +257,12 @@ tf_cc_test( ) tf_cc_test( - name = "file_block_cache_test", + name = "ram_file_block_cache_test", size = "small", - srcs = ["file_block_cache_test.cc"], + srcs = ["ram_file_block_cache_test.cc"], deps = [ - ":file_block_cache", ":now_seconds_env", + ":ram_file_block_cache", "//tensorflow/core:lib", "//tensorflow/core:lib_internal", "//tensorflow/core:test", diff --git a/tensorflow/core/platform/cloud/file_block_cache.cc b/tensorflow/core/platform/cloud/file_block_cache.cc deleted file mode 100644 index 6add1142a1..0000000000 --- a/tensorflow/core/platform/cloud/file_block_cache.cc +++ /dev/null @@ -1,274 +0,0 @@ -/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#include "tensorflow/core/platform/cloud/file_block_cache.h" -#include -#include -#include "tensorflow/core/lib/gtl/cleanup.h" -#include "tensorflow/core/platform/env.h" - -namespace tensorflow { - -bool FileBlockCache::BlockNotStale(const std::shared_ptr& block) { - mutex_lock l(block->mu); - if (block->state != FetchState::FINISHED) { - return true; // No need to check for staleness. - } - if (max_staleness_ == 0) return true; // Not enforcing staleness. - return env_->NowSeconds() - block->timestamp <= max_staleness_; -} - -std::shared_ptr FileBlockCache::Lookup(const Key& key) { - mutex_lock lock(mu_); - auto entry = block_map_.find(key); - if (entry != block_map_.end()) { - if (BlockNotStale(entry->second)) { - return entry->second; - } else { - // Remove the stale block and continue. - RemoveFile_Locked(key.first); - } - } - - // Insert a new empty block, setting the bookkeeping to sentinel values - // in order to update them as appropriate. - auto new_entry = std::make_shared(); - lru_list_.push_front(key); - lra_list_.push_front(key); - new_entry->lru_iterator = lru_list_.begin(); - new_entry->lra_iterator = lra_list_.begin(); - new_entry->timestamp = env_->NowSeconds(); - block_map_.emplace(std::make_pair(key, new_entry)); - return new_entry; -} - -// Remove blocks from the cache until we do not exceed our maximum size. -void FileBlockCache::Trim() { - while (!lru_list_.empty() && cache_size_ > max_bytes_) { - RemoveBlock(block_map_.find(lru_list_.back())); - } -} - -/// Move the block to the front of the LRU list if it isn't already there. -Status FileBlockCache::UpdateLRU(const Key& key, - const std::shared_ptr& block) { - mutex_lock lock(mu_); - if (block->timestamp == 0) { - // The block was evicted from another thread. Allow it to remain evicted. - return Status::OK(); - } - if (block->lru_iterator != lru_list_.begin()) { - lru_list_.erase(block->lru_iterator); - lru_list_.push_front(key); - block->lru_iterator = lru_list_.begin(); - } - - // Check for inconsistent state. If there is a block later in the same file - // in the cache, and our current block is not block size, this likely means - // we have inconsistent state within the cache. Note: it's possible some - // incomplete reads may still go undetected. - if (block->data.size() < block_size_) { - Key fmax = std::make_pair(key.first, std::numeric_limits::max()); - auto fcmp = block_map_.upper_bound(fmax); - if (fcmp != block_map_.begin() && key < (--fcmp)->first) { - return errors::Internal("Block cache contents are inconsistent."); - } - } - - Trim(); - - return Status::OK(); -} - -Status FileBlockCache::MaybeFetch(const Key& key, - const std::shared_ptr& block) { - bool downloaded_block = false; - auto reconcile_state = - gtl::MakeCleanup([this, &downloaded_block, &key, &block] { - // Perform this action in a cleanup callback to avoid locking mu_ after - // locking block->mu. - if (downloaded_block) { - mutex_lock l(mu_); - // Do not update state if the block is already to be evicted. - if (block->timestamp != 0) { - cache_size_ += block->data.size(); - // Put to beginning of LRA list. - lra_list_.erase(block->lra_iterator); - lra_list_.push_front(key); - block->lra_iterator = lra_list_.begin(); - block->timestamp = env_->NowSeconds(); - } - } - }); - // Loop until either block content is successfully fetched, or our request - // encounters an error. - mutex_lock l(block->mu); - Status status = Status::OK(); - while (true) { - switch (block->state) { - case FetchState::ERROR: - TF_FALLTHROUGH_INTENDED; - case FetchState::CREATED: - block->state = FetchState::FETCHING; - block->mu.unlock(); // Release the lock while making the API call. - block->data.clear(); - block->data.resize(block_size_, 0); - size_t bytes_transferred; - status.Update(block_fetcher_(key.first, key.second, block_size_, - block->data.data(), &bytes_transferred)); - block->mu.lock(); // Reacquire the lock immediately afterwards - if (status.ok()) { - block->data.resize(bytes_transferred, 0); - block->data.shrink_to_fit(); - downloaded_block = true; - block->state = FetchState::FINISHED; - } else { - block->state = FetchState::ERROR; - } - block->cond_var.notify_all(); - return status; - case FetchState::FETCHING: - block->cond_var.wait_for(l, std::chrono::seconds(60)); - if (block->state == FetchState::FINISHED) { - return Status::OK(); - } - // Re-loop in case of errors. - break; - case FetchState::FINISHED: - return Status::OK(); - } - } - return errors::Internal( - "Control flow should never reach the end of FileBlockCache::Fetch."); -} - -Status FileBlockCache::Read(const string& filename, size_t offset, size_t n, - char* buffer, size_t* bytes_transferred) { - *bytes_transferred = 0; - if (n == 0) { - return Status::OK(); - } - if (block_size_ == 0 || max_bytes_ == 0) { - // The cache is effectively disabled, so we pass the read through to the - // fetcher without breaking it up into blocks. - return block_fetcher_(filename, offset, n, buffer, bytes_transferred); - } - // Calculate the block-aligned start and end of the read. - size_t start = block_size_ * (offset / block_size_); - size_t finish = block_size_ * ((offset + n) / block_size_); - if (finish < offset + n) { - finish += block_size_; - } - size_t total_bytes_transferred = 0; - // Now iterate through the blocks, reading them one at a time. - for (size_t pos = start; pos < finish; pos += block_size_) { - Key key = std::make_pair(filename, pos); - // Look up the block, fetching and inserting it if necessary, and update the - // LRU iterator for the key and block. - std::shared_ptr block = Lookup(key); - DCHECK(block) << "No block for key " << key.first << "@" << key.second; - TF_RETURN_IF_ERROR(MaybeFetch(key, block)); - TF_RETURN_IF_ERROR(UpdateLRU(key, block)); - // Copy the relevant portion of the block into the result buffer. - const auto& data = block->data; - if (offset >= pos + data.size()) { - // The requested offset is at or beyond the end of the file. This can - // happen if `offset` is not block-aligned, and the read returns the last - // block in the file, which does not extend all the way out to `offset`. - *bytes_transferred = total_bytes_transferred; - return errors::OutOfRange("EOF at offset ", offset, " in file ", filename, - " at position ", pos, "with data size ", - data.size()); - } - auto begin = data.begin(); - if (offset > pos) { - // The block begins before the slice we're reading. - begin += offset - pos; - } - auto end = data.end(); - if (pos + data.size() > offset + n) { - // The block extends past the end of the slice we're reading. - end -= (pos + data.size()) - (offset + n); - } - if (begin < end) { - size_t bytes_to_copy = end - begin; - memcpy(&buffer[total_bytes_transferred], &*begin, bytes_to_copy); - total_bytes_transferred += bytes_to_copy; - } - if (data.size() < block_size_) { - // The block was a partial block and thus signals EOF at its upper bound. - break; - } - } - *bytes_transferred = total_bytes_transferred; - return Status::OK(); -} - -size_t FileBlockCache::CacheSize() const { - mutex_lock lock(mu_); - return cache_size_; -} - -void FileBlockCache::Prune() { - while (!WaitForNotificationWithTimeout(&stop_pruning_thread_, 1000000)) { - mutex_lock lock(mu_); - uint64 now = env_->NowSeconds(); - while (!lra_list_.empty()) { - auto it = block_map_.find(lra_list_.back()); - if (now - it->second->timestamp <= max_staleness_) { - // The oldest block is not yet expired. Come back later. - break; - } - // We need to make a copy of the filename here, since it could otherwise - // be used within RemoveFile_Locked after `it` is deleted. - RemoveFile_Locked(std::string(it->first.first)); - } - } -} - -void FileBlockCache::Flush() { - mutex_lock lock(mu_); - block_map_.clear(); - lru_list_.clear(); - lra_list_.clear(); - cache_size_ = 0; -} - -void FileBlockCache::RemoveFile(const string& filename) { - mutex_lock lock(mu_); - RemoveFile_Locked(filename); -} - -void FileBlockCache::RemoveFile_Locked(const string& filename) { - Key begin = std::make_pair(filename, 0); - auto it = block_map_.lower_bound(begin); - while (it != block_map_.end() && it->first.first == filename) { - auto next = std::next(it); - RemoveBlock(it); - it = next; - } -} - -void FileBlockCache::RemoveBlock(BlockMap::iterator entry) { - // This signals that the block is removed, and should not be inadvertently - // reinserted into the cache in UpdateLRU. - entry->second->timestamp = 0; - lru_list_.erase(entry->second->lru_iterator); - lra_list_.erase(entry->second->lra_iterator); - cache_size_ -= entry->second->data.size(); - block_map_.erase(entry); -} - -} // namespace tensorflow diff --git a/tensorflow/core/platform/cloud/file_block_cache.h b/tensorflow/core/platform/cloud/file_block_cache.h index 5c180e2332..da16788247 100644 --- a/tensorflow/core/platform/cloud/file_block_cache.h +++ b/tensorflow/core/platform/cloud/file_block_cache.h @@ -32,7 +32,7 @@ limitations under the License. namespace tensorflow { -/// \brief An LRU block cache of file contents, keyed by {filename, offset}. +/// \brief A block cache of file contents, keyed by {filename, offset}. /// /// This class should be shared by read-only random access files on a remote /// filesystem (e.g. GCS). @@ -48,27 +48,7 @@ class FileBlockCache { size_t* bytes_transferred)> BlockFetcher; - FileBlockCache(size_t block_size, size_t max_bytes, uint64 max_staleness, - BlockFetcher block_fetcher, Env* env = Env::Default()) - : block_size_(block_size), - max_bytes_(max_bytes), - max_staleness_(max_staleness), - block_fetcher_(block_fetcher), - env_(env) { - if (max_staleness_ > 0) { - pruning_thread_.reset(env_->StartThread(ThreadOptions(), "TF_prune_FBC", - [this] { Prune(); })); - } - } - - ~FileBlockCache() { - if (pruning_thread_) { - stop_pruning_thread_.Notify(); - // Destroying pruning_thread_ will block until Prune() receives the above - // notification and returns. - pruning_thread_.reset(); - } - } + virtual ~FileBlockCache() {} /// Read `n` bytes from `filename` starting at `offset` into `out`. This /// method will return: @@ -84,143 +64,22 @@ class FileBlockCache { /// placed in `out`. /// 4) OK otherwise (i.e. the read succeeded, and at least one byte was placed /// in `out`). - Status Read(const string& filename, size_t offset, size_t n, char* buffer, - size_t* bytes_transferred); + virtual Status Read(const string& filename, size_t offset, size_t n, + char* buffer, size_t* bytes_transferred) = 0; /// Remove all cached blocks for `filename`. - void RemoveFile(const string& filename) LOCKS_EXCLUDED(mu_); + virtual void RemoveFile(const string& filename) = 0; /// Remove all cached data. - void Flush() LOCKS_EXCLUDED(mu_); + virtual void Flush() = 0; /// Accessors for cache parameters. - size_t block_size() const { return block_size_; } - size_t max_bytes() const { return max_bytes_; } - uint64 max_staleness() const { return max_staleness_; } + virtual size_t block_size() const = 0; + virtual size_t max_bytes() const = 0; + virtual uint64 max_staleness() const = 0; /// The current size (in bytes) of the cache. - size_t CacheSize() const LOCKS_EXCLUDED(mu_); - - private: - /// The size of the blocks stored in the LRU cache, as well as the size of the - /// reads from the underlying filesystem. - const size_t block_size_; - /// The maximum number of bytes (sum of block sizes) allowed in the LRU cache. - const size_t max_bytes_; - /// The maximum staleness of any block in the LRU cache, in seconds. - const uint64 max_staleness_; - /// The callback to read a block from the underlying filesystem. - const BlockFetcher block_fetcher_; - /// The Env from which we read timestamps. - Env* const env_; // not owned - - /// \brief The key type for the file block cache. - /// - /// The file block cache key is a {filename, offset} pair. - typedef std::pair Key; - - /// \brief The state of a block. - /// - /// A block begins in the CREATED stage. The first thread will attempt to read - /// the block from the filesystem, transitioning the state of the block to - /// FETCHING. After completing, if the read was successful the state should - /// be FINISHED. Otherwise the state should be ERROR. A subsequent read can - /// re-fetch the block if the state is ERROR. - enum class FetchState { - CREATED, - FETCHING, - FINISHED, - ERROR, - }; - - /// \brief A block of a file. - /// - /// A file block consists of the block data, the block's current position in - /// the LRU cache, the timestamp (seconds since epoch) at which the block - /// was cached, a coordination lock, and state & condition variables. - /// - /// Thread safety: - /// The iterator and timestamp fields should only be accessed while holding - /// the block-cache-wide mu_ instance variable. The state variable should only - /// be accessed while holding the Block's mu lock. The data vector should only - /// be accessed after state == FINISHED, and it should never be modified. - /// - /// In order to prevent deadlocks, never grab the block-cache-wide mu_ lock - /// AFTER grabbing any block's mu lock. It is safe to grab mu without locking - /// mu_. - struct Block { - /// The block data. - std::vector data; - /// A list iterator pointing to the block's position in the LRU list. - std::list::iterator lru_iterator; - /// A list iterator pointing to the block's position in the LRA list. - std::list::iterator lra_iterator; - /// The timestamp (seconds since epoch) at which the block was cached. - uint64 timestamp; - /// Mutex to guard state variable - mutex mu; - /// The state of the block. - FetchState state GUARDED_BY(mu) = FetchState::CREATED; - /// Wait on cond_var if state is FETCHING. - condition_variable cond_var; - }; - - /// \brief The block map type for the file block cache. - /// - /// The block map is an ordered map from Key to Block. - typedef std::map> BlockMap; - - /// Prune the cache by removing files with expired blocks. - void Prune() LOCKS_EXCLUDED(mu_); - - bool BlockNotStale(const std::shared_ptr& block) - EXCLUSIVE_LOCKS_REQUIRED(mu_); - - /// Look up a Key in the block cache. - std::shared_ptr Lookup(const Key& key) LOCKS_EXCLUDED(mu_); - - Status MaybeFetch(const Key& key, const std::shared_ptr& block) - LOCKS_EXCLUDED(mu_); - - /// Trim the block cache to make room for another entry. - void Trim() EXCLUSIVE_LOCKS_REQUIRED(mu_); - - /// Update the LRU iterator for the block at `key`. - Status UpdateLRU(const Key& key, const std::shared_ptr& block) - LOCKS_EXCLUDED(mu_); - - /// Remove all blocks of a file, with mu_ already held. - void RemoveFile_Locked(const string& filename) EXCLUSIVE_LOCKS_REQUIRED(mu_); - - /// Remove the block `entry` from the block map and LRU list, and update the - /// cache size accordingly. - void RemoveBlock(BlockMap::iterator entry) EXCLUSIVE_LOCKS_REQUIRED(mu_); - - /// The cache pruning thread that removes files with expired blocks. - std::unique_ptr pruning_thread_; - - /// Notification for stopping the cache pruning thread. - Notification stop_pruning_thread_; - - /// Guards access to the block map, LRU list, and cached byte count. - mutable mutex mu_; - - /// The block map (map from Key to Block). - BlockMap block_map_ GUARDED_BY(mu_); - - /// The LRU list of block keys. The front of the list identifies the most - /// recently accessed block. - std::list lru_list_ GUARDED_BY(mu_); - - /// The LRA (least recently added) list of block keys. The front of the list - /// identifies the most recently added block. - /// - /// Note: blocks are added to lra_list_ only after they have successfully been - /// fetched from the underlying block store. - std::list lra_list_ GUARDED_BY(mu_); - - /// The combined number of bytes in all of the cached blocks. - size_t cache_size_ GUARDED_BY(mu_) = 0; + virtual size_t CacheSize() const = 0; }; } // namespace tensorflow diff --git a/tensorflow/core/platform/cloud/file_block_cache_test.cc b/tensorflow/core/platform/cloud/file_block_cache_test.cc deleted file mode 100644 index 596fdbf19e..0000000000 --- a/tensorflow/core/platform/cloud/file_block_cache_test.cc +++ /dev/null @@ -1,519 +0,0 @@ -/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#include "tensorflow/core/platform/cloud/file_block_cache.h" -#include -#include "tensorflow/core/lib/core/blocking_counter.h" -#include "tensorflow/core/lib/core/status_test_util.h" -#include "tensorflow/core/platform/cloud/now_seconds_env.h" -#include "tensorflow/core/platform/env.h" -#include "tensorflow/core/platform/notification.h" -#include "tensorflow/core/platform/test.h" - -namespace tensorflow { -namespace { - -Status ReadCache(FileBlockCache* cache, const string& filename, size_t offset, - size_t n, std::vector* out) { - out->clear(); - out->resize(n, 0); - size_t bytes_transferred = 0; - Status status = - cache->Read(filename, offset, n, out->data(), &bytes_transferred); - EXPECT_LE(bytes_transferred, n); - out->resize(bytes_transferred, n); - return status; -} - -TEST(FileBlockCacheTest, PassThrough) { - const string want_filename = "foo/bar"; - const size_t want_offset = 42; - const size_t want_n = 1024; - int calls = 0; - auto fetcher = [&calls, want_filename, want_offset, want_n]( - const string& got_filename, size_t got_offset, - size_t got_n, char* buffer, size_t* bytes_transferred) { - EXPECT_EQ(got_filename, want_filename); - EXPECT_EQ(got_offset, want_offset); - EXPECT_EQ(got_n, want_n); - calls++; - memset(buffer, 'x', got_n); - *bytes_transferred = got_n; - return Status::OK(); - }; - // If block_size, max_bytes, or both are zero, the cache is a pass-through. - FileBlockCache cache1(1, 0, 0, fetcher); - FileBlockCache cache2(0, 1, 0, fetcher); - FileBlockCache cache3(0, 0, 0, fetcher); - std::vector out; - TF_EXPECT_OK(ReadCache(&cache1, want_filename, want_offset, want_n, &out)); - EXPECT_EQ(calls, 1); - TF_EXPECT_OK(ReadCache(&cache2, want_filename, want_offset, want_n, &out)); - EXPECT_EQ(calls, 2); - TF_EXPECT_OK(ReadCache(&cache3, want_filename, want_offset, want_n, &out)); - EXPECT_EQ(calls, 3); -} - -TEST(FileBlockCacheTest, BlockAlignment) { - // Initialize a 256-byte buffer. This is the file underlying the reads we'll - // do in this test. - const size_t size = 256; - std::vector buf; - for (int i = 0; i < size; i++) { - buf.push_back(i); - } - // The fetcher just fetches slices of the buffer. - auto fetcher = [&buf](const string& filename, size_t offset, size_t n, - char* buffer, size_t* bytes_transferred) { - if (offset < buf.size()) { - size_t bytes_to_copy = std::min(buf.size() - offset, n); - memcpy(buffer, buf.data() + offset, bytes_to_copy); - *bytes_transferred = bytes_to_copy; - } else { - *bytes_transferred = 0; - } - return Status::OK(); - }; - for (size_t block_size = 2; block_size <= 4; block_size++) { - // Make a cache of N-byte block size (1 block) and verify that reads of - // varying offsets and lengths return correct data. - FileBlockCache cache(block_size, block_size, 0, fetcher); - for (size_t offset = 0; offset < 10; offset++) { - for (size_t n = block_size - 2; n <= block_size + 2; n++) { - std::vector got; - TF_EXPECT_OK(ReadCache(&cache, "", offset, n, &got)); - // Verify the size of the read. - if (offset + n <= size) { - // Expect a full read. - EXPECT_EQ(got.size(), n) << "block size = " << block_size - << ", offset = " << offset << ", n = " << n; - } else { - // Expect a partial read. - EXPECT_EQ(got.size(), size - offset) - << "block size = " << block_size << ", offset = " << offset - << ", n = " << n; - } - // Verify the contents of the read. - std::vector::const_iterator begin = buf.begin() + offset; - std::vector::const_iterator end = - offset + n > buf.size() ? buf.end() : begin + n; - std::vector want(begin, end); - EXPECT_EQ(got, want) << "block size = " << block_size - << ", offset = " << offset << ", n = " << n; - } - } - } -} - -TEST(FileBlockCacheTest, CacheHits) { - const size_t block_size = 16; - std::set calls; - auto fetcher = [&calls, block_size](const string& filename, size_t offset, - size_t n, char* buffer, - size_t* bytes_transferred) { - EXPECT_EQ(n, block_size); - EXPECT_EQ(offset % block_size, 0); - EXPECT_EQ(calls.find(offset), calls.end()) << "at offset " << offset; - calls.insert(offset); - memset(buffer, 'x', n); - *bytes_transferred = n; - return Status::OK(); - }; - const uint32 block_count = 256; - FileBlockCache cache(block_size, block_count * block_size, 0, fetcher); - std::vector out; - out.resize(block_count, 0); - // The cache has space for `block_count` blocks. The loop with i = 0 should - // fill the cache, and the loop with i = 1 should be all cache hits. The - // fetcher checks that it is called once and only once for each offset (to - // fetch the corresponding block). - for (int i = 0; i < 2; i++) { - for (int j = 0; j < block_count; j++) { - TF_EXPECT_OK(ReadCache(&cache, "", block_size * j, block_size, &out)); - } - } -} - -TEST(FileBlockCacheTest, OutOfRange) { - // Tests reads of a 24-byte file with block size 16. - const size_t block_size = 16; - const size_t file_size = 24; - bool first_block = false; - bool second_block = false; - auto fetcher = [block_size, file_size, &first_block, &second_block]( - const string& filename, size_t offset, size_t n, - char* buffer, size_t* bytes_transferred) { - EXPECT_EQ(n, block_size); - EXPECT_EQ(offset % block_size, 0); - size_t bytes_to_copy = 0; - if (offset == 0) { - // The first block (16 bytes) of the file. - memset(buffer, 'x', n); - bytes_to_copy = n; - first_block = true; - } else if (offset == block_size) { - // The second block (8 bytes) of the file. - bytes_to_copy = file_size - block_size; - memset(buffer, 'x', bytes_to_copy); - second_block = true; - } - *bytes_transferred = bytes_to_copy; - return Status::OK(); - }; - FileBlockCache cache(block_size, block_size, 0, fetcher); - std::vector out; - // Reading the first 16 bytes should be fine. - TF_EXPECT_OK(ReadCache(&cache, "", 0, block_size, &out)); - EXPECT_TRUE(first_block); - EXPECT_EQ(out.size(), block_size); - // Reading at offset file_size + 4 will read the second block (since the read - // at file_size + 4 = 28 will be aligned to an offset of 16) but will return - // OutOfRange because the offset is past the end of the 24-byte file. - Status status = ReadCache(&cache, "", file_size + 4, 4, &out); - EXPECT_EQ(status.code(), error::OUT_OF_RANGE); - EXPECT_TRUE(second_block); - // Reading the second full block will return 8 bytes, from a cache hit. - second_block = false; - TF_EXPECT_OK(ReadCache(&cache, "", block_size, block_size, &out)); - EXPECT_FALSE(second_block); - EXPECT_EQ(out.size(), file_size - block_size); -} - -TEST(FileBlockCacheTest, Inconsistent) { - // Tests the detection of interrupted reads leading to partially filled blocks - // where we expected complete blocks. - const size_t block_size = 16; - // This fetcher returns OK but only fills in one byte for any offset. - auto fetcher = [block_size](const string& filename, size_t offset, size_t n, - char* buffer, size_t* bytes_transferred) { - EXPECT_EQ(n, block_size); - EXPECT_EQ(offset % block_size, 0); - EXPECT_GE(n, 1); - memset(buffer, 'x', 1); - *bytes_transferred = 1; - return Status::OK(); - }; - FileBlockCache cache(block_size, 2 * block_size, 0, fetcher); - std::vector out; - // Read the second block; this should yield an OK status and a single byte. - TF_EXPECT_OK(ReadCache(&cache, "", block_size, block_size, &out)); - EXPECT_EQ(out.size(), 1); - // Now read the first block; this should yield an INTERNAL error because we - // had already cached a partial block at a later position. - Status status = ReadCache(&cache, "", 0, block_size, &out); - EXPECT_EQ(status.code(), error::INTERNAL); -} - -TEST(FileBlockCacheTest, LRU) { - const size_t block_size = 16; - std::list calls; - auto fetcher = [&calls, block_size](const string& filename, size_t offset, - size_t n, char* buffer, - size_t* bytes_transferred) { - EXPECT_EQ(n, block_size); - EXPECT_FALSE(calls.empty()) << "at offset = " << offset; - if (!calls.empty()) { - EXPECT_EQ(offset, calls.front()); - calls.pop_front(); - } - memset(buffer, 'x', n); - *bytes_transferred = n; - return Status::OK(); - }; - const uint32 block_count = 2; - FileBlockCache cache(block_size, block_count * block_size, 0, fetcher); - std::vector out; - // Read blocks from the cache, and verify the LRU behavior based on the - // fetcher calls that the cache makes. - calls.push_back(0); - // Cache miss - drains an element from `calls`. - TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out)); - // Cache hit - does not drain an element from `calls`. - TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out)); - calls.push_back(block_size); - // Cache miss followed by cache hit. - TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out)); - TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out)); - calls.push_back(2 * block_size); - // Cache miss followed by cache hit. Causes eviction of LRU element. - TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out)); - TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out)); - // LRU element was at offset 0. Cache miss. - calls.push_back(0); - TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out)); - // Element at 2 * block_size is still in cache, and this read should update - // its position in the LRU list so it doesn't get evicted by the next read. - TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out)); - // Element at block_size was evicted. Reading this element will also cause - // the LRU element (at 0) to be evicted. - calls.push_back(block_size); - TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out)); - // Element at 0 was evicted again. - calls.push_back(0); - TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out)); -} - -TEST(FileBlockCacheTest, MaxStaleness) { - int calls = 0; - auto fetcher = [&calls](const string& filename, size_t offset, size_t n, - char* buffer, size_t* bytes_transferred) { - calls++; - memset(buffer, 'x', n); - *bytes_transferred = n; - return Status::OK(); - }; - std::vector out; - std::unique_ptr env(new NowSecondsEnv); - // Create a cache with max staleness of 2 seconds, and verify that it works as - // expected. - FileBlockCache cache1(8, 16, 2 /* max staleness */, fetcher, env.get()); - // Execute the first read to load the block. - TF_EXPECT_OK(ReadCache(&cache1, "", 0, 1, &out)); - EXPECT_EQ(calls, 1); - // Now advance the clock one second at a time and redo the read. The call - // count should advance every 3 seconds (i.e. every time the staleness is - // greater than 2). - for (int i = 1; i <= 10; i++) { - env->SetNowSeconds(i + 1); - TF_EXPECT_OK(ReadCache(&cache1, "", 0, 1, &out)); - EXPECT_EQ(calls, 1 + i / 3); - } - // Now create a cache with max staleness of 0, and verify that it also works - // as expected. - calls = 0; - env->SetNowSeconds(0); - FileBlockCache cache2(8, 16, 0 /* max staleness */, fetcher, env.get()); - // Execute the first read to load the block. - TF_EXPECT_OK(ReadCache(&cache2, "", 0, 1, &out)); - EXPECT_EQ(calls, 1); - // Advance the clock by a huge amount and verify that the cached block is - // used to satisfy the read. - env->SetNowSeconds(365 * 24 * 60 * 60); // ~1 year, just for fun. - TF_EXPECT_OK(ReadCache(&cache2, "", 0, 1, &out)); - EXPECT_EQ(calls, 1); -} - -TEST(FileBlockCacheTest, RemoveFile) { - int calls = 0; - auto fetcher = [&calls](const string& filename, size_t offset, size_t n, - char* buffer, size_t* bytes_transferred) { - calls++; - char c = (filename == "a") ? 'a' : (filename == "b") ? 'b' : 'x'; - if (offset > 0) { - // The first block is lower case and all subsequent blocks are upper case. - c = toupper(c); - } - memset(buffer, c, n); - *bytes_transferred = n; - return Status::OK(); - }; - // This cache has space for 4 blocks; we'll read from two files. - const size_t n = 3; - FileBlockCache cache(8, 32, 0, fetcher); - std::vector out; - std::vector a(n, 'a'); - std::vector b(n, 'b'); - std::vector A(n, 'A'); - std::vector B(n, 'B'); - // Fill the cache. - TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out)); - EXPECT_EQ(out, a); - EXPECT_EQ(calls, 1); - TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out)); - EXPECT_EQ(out, A); - EXPECT_EQ(calls, 2); - TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out)); - EXPECT_EQ(out, b); - EXPECT_EQ(calls, 3); - TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out)); - EXPECT_EQ(out, B); - EXPECT_EQ(calls, 4); - // All four blocks should be in the cache now. - TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out)); - EXPECT_EQ(out, a); - TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out)); - EXPECT_EQ(out, A); - TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out)); - EXPECT_EQ(out, b); - TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out)); - EXPECT_EQ(out, B); - EXPECT_EQ(calls, 4); - // Remove the blocks from "a". - cache.RemoveFile("a"); - // Both blocks from "b" should still be there. - TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out)); - EXPECT_EQ(out, b); - TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out)); - EXPECT_EQ(out, B); - EXPECT_EQ(calls, 4); - // The blocks from "a" should not be there. - TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out)); - EXPECT_EQ(out, a); - EXPECT_EQ(calls, 5); - TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out)); - EXPECT_EQ(out, A); - EXPECT_EQ(calls, 6); -} - -TEST(FileBlockCacheTest, Prune) { - int calls = 0; - auto fetcher = [&calls](const string& filename, size_t offset, size_t n, - char* buffer, size_t* bytes_transferred) { - calls++; - memset(buffer, 'x', n); - *bytes_transferred = n; - return Status::OK(); - }; - std::vector out; - // Our fake environment is initialized with the current timestamp. - std::unique_ptr env(new NowSecondsEnv); - uint64 now = Env::Default()->NowSeconds(); - env->SetNowSeconds(now); - FileBlockCache cache(8, 32, 1 /* max staleness */, fetcher, env.get()); - // Read three blocks into the cache, and advance the timestamp by one second - // with each read. Start with a block of "a" at the current timestamp `now`. - TF_EXPECT_OK(ReadCache(&cache, "a", 0, 1, &out)); - // Now load a block of a different file "b" at timestamp `now` + 1 - env->SetNowSeconds(now + 1); - TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out)); - // Now load a different block of file "a" at timestamp `now` + 1. When the - // first block of "a" expires, this block should also be removed because it - // also belongs to file "a". - TF_EXPECT_OK(ReadCache(&cache, "a", 8, 1, &out)); - // Ensure that all blocks are in the cache (i.e. reads are cache hits). - EXPECT_EQ(cache.CacheSize(), 24); - EXPECT_EQ(calls, 3); - TF_EXPECT_OK(ReadCache(&cache, "a", 0, 1, &out)); - TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out)); - TF_EXPECT_OK(ReadCache(&cache, "a", 8, 1, &out)); - EXPECT_EQ(calls, 3); - // Advance the fake timestamp so that "a" becomes stale via its first block. - env->SetNowSeconds(now + 2); - // The pruning thread periodically compares env->NowSeconds() with the oldest - // block's timestamp to see if it should evict any files. At the current fake - // timestamp of `now` + 2, file "a" is stale because its first block is stale, - // but file "b" is not stale yet. Thus, once the pruning thread wakes up (in - // one second of wall time), it should remove "a" and leave "b" alone. - uint64 start = Env::Default()->NowSeconds(); - do { - Env::Default()->SleepForMicroseconds(100000); - } while (cache.CacheSize() == 24 && Env::Default()->NowSeconds() - start < 3); - // There should be one block left in the cache, and it should be the first - // block of "b". - EXPECT_EQ(cache.CacheSize(), 8); - TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out)); - EXPECT_EQ(calls, 3); - // Advance the fake time to `now` + 3, at which point "b" becomes stale. - env->SetNowSeconds(now + 3); - // Wait for the pruner to remove "b". - start = Env::Default()->NowSeconds(); - do { - Env::Default()->SleepForMicroseconds(100000); - } while (cache.CacheSize() == 8 && Env::Default()->NowSeconds() - start < 3); - // The cache should now be empty. - EXPECT_EQ(cache.CacheSize(), 0); -} - -TEST(FileBlockCacheTest, ParallelReads) { - // This fetcher won't respond until either `callers` threads are calling it - // concurrently (at which point it will respond with success to all callers), - // or 10 seconds have elapsed (at which point it will respond with an error). - const int callers = 4; - BlockingCounter counter(callers); - auto fetcher = [&counter](const string& filename, size_t offset, size_t n, - char* buffer, size_t* bytes_transferred) { - counter.DecrementCount(); - if (!counter.WaitFor(std::chrono::seconds(10))) { - // This avoids having the test time out, which is harder to debug. - return errors::FailedPrecondition("desired concurrency not reached"); - } - memset(buffer, 'x', n); - *bytes_transferred = n; - return Status::OK(); - }; - const int block_size = 8; - FileBlockCache cache(block_size, 2 * callers * block_size, 0, fetcher); - std::vector> threads; - for (int i = 0; i < callers; i++) { - threads.emplace_back( - Env::Default()->StartThread({}, "caller", [&cache, i, block_size]() { - std::vector out; - TF_EXPECT_OK( - ReadCache(&cache, "a", i * block_size, block_size, &out)); - std::vector x(block_size, 'x'); - EXPECT_EQ(out, x); - })); - } - // The `threads` destructor blocks until the threads can be joined, once their - // respective reads finish (which happens once they are all concurrently being - // executed, or 10 seconds have passed). -} - -TEST(FileBlockCacheTest, CoalesceConcurrentReads) { - // Concurrent reads to the same file blocks should be de-duplicated. - const size_t block_size = 16; - int num_requests = 0; - Notification notification; - auto fetcher = [&num_requests, ¬ification, block_size]( - const string& filename, size_t offset, size_t n, - char* buffer, size_t* bytes_transferred) { - EXPECT_EQ(n, block_size); - EXPECT_EQ(offset, 0); - num_requests++; - memset(buffer, 'x', n); - *bytes_transferred = n; - notification.Notify(); - // Wait for other thread to issue read. - Env::Default()->SleepForMicroseconds(100000); // 0.1 secs - return Status::OK(); - }; - FileBlockCache cache(block_size, block_size, 0, fetcher); - // Fork off thread for parallel read. - std::unique_ptr concurrent( - Env::Default()->StartThread({}, "concurrent", [&cache, block_size] { - std::vector out; - TF_EXPECT_OK(ReadCache(&cache, "", 0, block_size / 2, &out)); - EXPECT_EQ(out.size(), block_size / 2); - })); - EXPECT_TRUE(WaitForNotificationWithTimeout(¬ification, 10000)) - << "Timeout waiting for concurrent thread to start."; - std::vector out; - TF_EXPECT_OK(ReadCache(&cache, "", block_size / 2, block_size / 2, &out)); - EXPECT_EQ(out.size(), block_size / 2); - - EXPECT_EQ(1, num_requests); -} - -TEST(FileBlockCacheTest, Flush) { - int calls = 0; - auto fetcher = [&calls](const string& filename, size_t offset, size_t n, - char* buffer, size_t* bytes_transferred) { - calls++; - memset(buffer, 'x', n); - *bytes_transferred = n; - return Status::OK(); - }; - FileBlockCache cache(16, 32, 0, fetcher); - std::vector out; - TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out)); - TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out)); - EXPECT_EQ(calls, 1); - cache.Flush(); - TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out)); - EXPECT_EQ(calls, 2); -} - -} // namespace -} // namespace tensorflow diff --git a/tensorflow/core/platform/cloud/gcs_file_system.cc b/tensorflow/core/platform/cloud/gcs_file_system.cc index 01ca0d76ba..84b65cec4f 100644 --- a/tensorflow/core/platform/cloud/gcs_file_system.cc +++ b/tensorflow/core/platform/cloud/gcs_file_system.cc @@ -36,6 +36,7 @@ limitations under the License. #include "tensorflow/core/platform/cloud/curl_http_request.h" #include "tensorflow/core/platform/cloud/file_block_cache.h" #include "tensorflow/core/platform/cloud/google_auth_provider.h" +#include "tensorflow/core/platform/cloud/ram_file_block_cache.h" #include "tensorflow/core/platform/cloud/retrying_utils.h" #include "tensorflow/core/platform/cloud/time_util.h" #include "tensorflow/core/platform/env.h" @@ -783,13 +784,13 @@ Status GcsFileSystem::NewRandomAccessFile( // A helper function to build a FileBlockCache for GcsFileSystem. std::unique_ptr GcsFileSystem::MakeFileBlockCache( size_t block_size, size_t max_bytes, uint64 max_staleness) { - std::unique_ptr file_block_cache( - new FileBlockCache(block_size, max_bytes, max_staleness, - [this](const string& filename, size_t offset, size_t n, - char* buffer, size_t* bytes_transferred) { - return LoadBufferFromGCS(filename, offset, n, buffer, - bytes_transferred); - })); + std::unique_ptr file_block_cache(new RamFileBlockCache( + block_size, max_bytes, max_staleness, + [this](const string& filename, size_t offset, size_t n, char* buffer, + size_t* bytes_transferred) { + return LoadBufferFromGCS(filename, offset, n, buffer, + bytes_transferred); + })); return file_block_cache; } diff --git a/tensorflow/core/platform/cloud/ram_file_block_cache.cc b/tensorflow/core/platform/cloud/ram_file_block_cache.cc new file mode 100644 index 0000000000..55a5657a50 --- /dev/null +++ b/tensorflow/core/platform/cloud/ram_file_block_cache.cc @@ -0,0 +1,275 @@ +/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include "tensorflow/core/platform/cloud/ram_file_block_cache.h" +#include +#include +#include "tensorflow/core/lib/gtl/cleanup.h" +#include "tensorflow/core/platform/env.h" + +namespace tensorflow { + +bool RamFileBlockCache::BlockNotStale(const std::shared_ptr& block) { + mutex_lock l(block->mu); + if (block->state != FetchState::FINISHED) { + return true; // No need to check for staleness. + } + if (max_staleness_ == 0) return true; // Not enforcing staleness. + return env_->NowSeconds() - block->timestamp <= max_staleness_; +} + +std::shared_ptr RamFileBlockCache::Lookup( + const Key& key) { + mutex_lock lock(mu_); + auto entry = block_map_.find(key); + if (entry != block_map_.end()) { + if (BlockNotStale(entry->second)) { + return entry->second; + } else { + // Remove the stale block and continue. + RemoveFile_Locked(key.first); + } + } + + // Insert a new empty block, setting the bookkeeping to sentinel values + // in order to update them as appropriate. + auto new_entry = std::make_shared(); + lru_list_.push_front(key); + lra_list_.push_front(key); + new_entry->lru_iterator = lru_list_.begin(); + new_entry->lra_iterator = lra_list_.begin(); + new_entry->timestamp = env_->NowSeconds(); + block_map_.emplace(std::make_pair(key, new_entry)); + return new_entry; +} + +// Remove blocks from the cache until we do not exceed our maximum size. +void RamFileBlockCache::Trim() { + while (!lru_list_.empty() && cache_size_ > max_bytes_) { + RemoveBlock(block_map_.find(lru_list_.back())); + } +} + +/// Move the block to the front of the LRU list if it isn't already there. +Status RamFileBlockCache::UpdateLRU(const Key& key, + const std::shared_ptr& block) { + mutex_lock lock(mu_); + if (block->timestamp == 0) { + // The block was evicted from another thread. Allow it to remain evicted. + return Status::OK(); + } + if (block->lru_iterator != lru_list_.begin()) { + lru_list_.erase(block->lru_iterator); + lru_list_.push_front(key); + block->lru_iterator = lru_list_.begin(); + } + + // Check for inconsistent state. If there is a block later in the same file + // in the cache, and our current block is not block size, this likely means + // we have inconsistent state within the cache. Note: it's possible some + // incomplete reads may still go undetected. + if (block->data.size() < block_size_) { + Key fmax = std::make_pair(key.first, std::numeric_limits::max()); + auto fcmp = block_map_.upper_bound(fmax); + if (fcmp != block_map_.begin() && key < (--fcmp)->first) { + return errors::Internal("Block cache contents are inconsistent."); + } + } + + Trim(); + + return Status::OK(); +} + +Status RamFileBlockCache::MaybeFetch(const Key& key, + const std::shared_ptr& block) { + bool downloaded_block = false; + auto reconcile_state = + gtl::MakeCleanup([this, &downloaded_block, &key, &block] { + // Perform this action in a cleanup callback to avoid locking mu_ after + // locking block->mu. + if (downloaded_block) { + mutex_lock l(mu_); + // Do not update state if the block is already to be evicted. + if (block->timestamp != 0) { + cache_size_ += block->data.size(); + // Put to beginning of LRA list. + lra_list_.erase(block->lra_iterator); + lra_list_.push_front(key); + block->lra_iterator = lra_list_.begin(); + block->timestamp = env_->NowSeconds(); + } + } + }); + // Loop until either block content is successfully fetched, or our request + // encounters an error. + mutex_lock l(block->mu); + Status status = Status::OK(); + while (true) { + switch (block->state) { + case FetchState::ERROR: + TF_FALLTHROUGH_INTENDED; + case FetchState::CREATED: + block->state = FetchState::FETCHING; + block->mu.unlock(); // Release the lock while making the API call. + block->data.clear(); + block->data.resize(block_size_, 0); + size_t bytes_transferred; + status.Update(block_fetcher_(key.first, key.second, block_size_, + block->data.data(), &bytes_transferred)); + block->mu.lock(); // Reacquire the lock immediately afterwards + if (status.ok()) { + block->data.resize(bytes_transferred, 0); + block->data.shrink_to_fit(); + downloaded_block = true; + block->state = FetchState::FINISHED; + } else { + block->state = FetchState::ERROR; + } + block->cond_var.notify_all(); + return status; + case FetchState::FETCHING: + block->cond_var.wait_for(l, std::chrono::seconds(60)); + if (block->state == FetchState::FINISHED) { + return Status::OK(); + } + // Re-loop in case of errors. + break; + case FetchState::FINISHED: + return Status::OK(); + } + } + return errors::Internal( + "Control flow should never reach the end of RamFileBlockCache::Fetch."); +} + +Status RamFileBlockCache::Read(const string& filename, size_t offset, size_t n, + char* buffer, size_t* bytes_transferred) { + *bytes_transferred = 0; + if (n == 0) { + return Status::OK(); + } + if (block_size_ == 0 || max_bytes_ == 0) { + // The cache is effectively disabled, so we pass the read through to the + // fetcher without breaking it up into blocks. + return block_fetcher_(filename, offset, n, buffer, bytes_transferred); + } + // Calculate the block-aligned start and end of the read. + size_t start = block_size_ * (offset / block_size_); + size_t finish = block_size_ * ((offset + n) / block_size_); + if (finish < offset + n) { + finish += block_size_; + } + size_t total_bytes_transferred = 0; + // Now iterate through the blocks, reading them one at a time. + for (size_t pos = start; pos < finish; pos += block_size_) { + Key key = std::make_pair(filename, pos); + // Look up the block, fetching and inserting it if necessary, and update the + // LRU iterator for the key and block. + std::shared_ptr block = Lookup(key); + DCHECK(block) << "No block for key " << key.first << "@" << key.second; + TF_RETURN_IF_ERROR(MaybeFetch(key, block)); + TF_RETURN_IF_ERROR(UpdateLRU(key, block)); + // Copy the relevant portion of the block into the result buffer. + const auto& data = block->data; + if (offset >= pos + data.size()) { + // The requested offset is at or beyond the end of the file. This can + // happen if `offset` is not block-aligned, and the read returns the last + // block in the file, which does not extend all the way out to `offset`. + *bytes_transferred = total_bytes_transferred; + return errors::OutOfRange("EOF at offset ", offset, " in file ", filename, + " at position ", pos, "with data size ", + data.size()); + } + auto begin = data.begin(); + if (offset > pos) { + // The block begins before the slice we're reading. + begin += offset - pos; + } + auto end = data.end(); + if (pos + data.size() > offset + n) { + // The block extends past the end of the slice we're reading. + end -= (pos + data.size()) - (offset + n); + } + if (begin < end) { + size_t bytes_to_copy = end - begin; + memcpy(&buffer[total_bytes_transferred], &*begin, bytes_to_copy); + total_bytes_transferred += bytes_to_copy; + } + if (data.size() < block_size_) { + // The block was a partial block and thus signals EOF at its upper bound. + break; + } + } + *bytes_transferred = total_bytes_transferred; + return Status::OK(); +} + +size_t RamFileBlockCache::CacheSize() const { + mutex_lock lock(mu_); + return cache_size_; +} + +void RamFileBlockCache::Prune() { + while (!WaitForNotificationWithTimeout(&stop_pruning_thread_, 1000000)) { + mutex_lock lock(mu_); + uint64 now = env_->NowSeconds(); + while (!lra_list_.empty()) { + auto it = block_map_.find(lra_list_.back()); + if (now - it->second->timestamp <= max_staleness_) { + // The oldest block is not yet expired. Come back later. + break; + } + // We need to make a copy of the filename here, since it could otherwise + // be used within RemoveFile_Locked after `it` is deleted. + RemoveFile_Locked(std::string(it->first.first)); + } + } +} + +void RamFileBlockCache::Flush() { + mutex_lock lock(mu_); + block_map_.clear(); + lru_list_.clear(); + lra_list_.clear(); + cache_size_ = 0; +} + +void RamFileBlockCache::RemoveFile(const string& filename) { + mutex_lock lock(mu_); + RemoveFile_Locked(filename); +} + +void RamFileBlockCache::RemoveFile_Locked(const string& filename) { + Key begin = std::make_pair(filename, 0); + auto it = block_map_.lower_bound(begin); + while (it != block_map_.end() && it->first.first == filename) { + auto next = std::next(it); + RemoveBlock(it); + it = next; + } +} + +void RamFileBlockCache::RemoveBlock(BlockMap::iterator entry) { + // This signals that the block is removed, and should not be inadvertently + // reinserted into the cache in UpdateLRU. + entry->second->timestamp = 0; + lru_list_.erase(entry->second->lru_iterator); + lra_list_.erase(entry->second->lra_iterator); + cache_size_ -= entry->second->data.size(); + block_map_.erase(entry); +} + +} // namespace tensorflow diff --git a/tensorflow/core/platform/cloud/ram_file_block_cache.h b/tensorflow/core/platform/cloud/ram_file_block_cache.h new file mode 100644 index 0000000000..7fdd7b2e02 --- /dev/null +++ b/tensorflow/core/platform/cloud/ram_file_block_cache.h @@ -0,0 +1,229 @@ +/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_ +#define TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_ + +#include +#include +#include +#include +#include +#include +#include "tensorflow/core/lib/core/status.h" +#include "tensorflow/core/lib/core/stringpiece.h" +#include "tensorflow/core/platform/cloud/file_block_cache.h" +#include "tensorflow/core/platform/env.h" +#include "tensorflow/core/platform/mutex.h" +#include "tensorflow/core/platform/notification.h" +#include "tensorflow/core/platform/thread_annotations.h" +#include "tensorflow/core/platform/types.h" + +namespace tensorflow { + +/// \brief An LRU block cache of file contents, keyed by {filename, offset}. +/// +/// This class should be shared by read-only random access files on a remote +/// filesystem (e.g. GCS). +class RamFileBlockCache : public FileBlockCache { + public: + /// The callback executed when a block is not found in the cache, and needs to + /// be fetched from the backing filesystem. This callback is provided when the + /// cache is constructed. The returned Status should be OK as long as the + /// read from the remote filesystem succeeded (similar to the semantics of the + /// read(2) system call). + typedef std::function + BlockFetcher; + + RamFileBlockCache(size_t block_size, size_t max_bytes, uint64 max_staleness, + BlockFetcher block_fetcher, Env* env = Env::Default()) + : block_size_(block_size), + max_bytes_(max_bytes), + max_staleness_(max_staleness), + block_fetcher_(block_fetcher), + env_(env) { + if (max_staleness_ > 0) { + pruning_thread_.reset(env_->StartThread(ThreadOptions(), "TF_prune_FBC", + [this] { Prune(); })); + } + } + + ~RamFileBlockCache() override { + if (pruning_thread_) { + stop_pruning_thread_.Notify(); + // Destroying pruning_thread_ will block until Prune() receives the above + // notification and returns. + pruning_thread_.reset(); + } + } + + /// Read `n` bytes from `filename` starting at `offset` into `out`. This + /// method will return: + /// + /// 1) The error from the remote filesystem, if the read from the remote + /// filesystem failed. + /// 2) PRECONDITION_FAILED if the read from the remote filesystem succeeded, + /// but the read returned a partial block, and the LRU cache contained a + /// block at a higher offset (indicating that the partial block should have + /// been a full block). + /// 3) OUT_OF_RANGE if the read from the remote filesystem succeeded, but + /// the file contents do not extend past `offset` and thus nothing was + /// placed in `out`. + /// 4) OK otherwise (i.e. the read succeeded, and at least one byte was placed + /// in `out`). + Status Read(const string& filename, size_t offset, size_t n, char* buffer, + size_t* bytes_transferred) override; + + /// Remove all cached blocks for `filename`. + void RemoveFile(const string& filename) override LOCKS_EXCLUDED(mu_); + + /// Remove all cached data. + void Flush() LOCKS_EXCLUDED(mu_) override; + + /// Accessors for cache parameters. + size_t block_size() const override { return block_size_; } + size_t max_bytes() const override { return max_bytes_; } + uint64 max_staleness() const override { return max_staleness_; } + + /// The current size (in bytes) of the cache. + size_t CacheSize() const override LOCKS_EXCLUDED(mu_); + + private: + /// The size of the blocks stored in the LRU cache, as well as the size of the + /// reads from the underlying filesystem. + const size_t block_size_; + /// The maximum number of bytes (sum of block sizes) allowed in the LRU cache. + const size_t max_bytes_; + /// The maximum staleness of any block in the LRU cache, in seconds. + const uint64 max_staleness_; + /// The callback to read a block from the underlying filesystem. + const BlockFetcher block_fetcher_; + /// The Env from which we read timestamps. + Env* const env_; // not owned + + /// \brief The key type for the file block cache. + /// + /// The file block cache key is a {filename, offset} pair. + typedef std::pair Key; + + /// \brief The state of a block. + /// + /// A block begins in the CREATED stage. The first thread will attempt to read + /// the block from the filesystem, transitioning the state of the block to + /// FETCHING. After completing, if the read was successful the state should + /// be FINISHED. Otherwise the state should be ERROR. A subsequent read can + /// re-fetch the block if the state is ERROR. + enum class FetchState { + CREATED, + FETCHING, + FINISHED, + ERROR, + }; + + /// \brief A block of a file. + /// + /// A file block consists of the block data, the block's current position in + /// the LRU cache, the timestamp (seconds since epoch) at which the block + /// was cached, a coordination lock, and state & condition variables. + /// + /// Thread safety: + /// The iterator and timestamp fields should only be accessed while holding + /// the block-cache-wide mu_ instance variable. The state variable should only + /// be accessed while holding the Block's mu lock. The data vector should only + /// be accessed after state == FINISHED, and it should never be modified. + /// + /// In order to prevent deadlocks, never grab the block-cache-wide mu_ lock + /// AFTER grabbing any block's mu lock. It is safe to grab mu without locking + /// mu_. + struct Block { + /// The block data. + std::vector data; + /// A list iterator pointing to the block's position in the LRU list. + std::list::iterator lru_iterator; + /// A list iterator pointing to the block's position in the LRA list. + std::list::iterator lra_iterator; + /// The timestamp (seconds since epoch) at which the block was cached. + uint64 timestamp; + /// Mutex to guard state variable + mutex mu; + /// The state of the block. + FetchState state GUARDED_BY(mu) = FetchState::CREATED; + /// Wait on cond_var if state is FETCHING. + condition_variable cond_var; + }; + + /// \brief The block map type for the file block cache. + /// + /// The block map is an ordered map from Key to Block. + typedef std::map> BlockMap; + + /// Prune the cache by removing files with expired blocks. + void Prune() LOCKS_EXCLUDED(mu_); + + bool BlockNotStale(const std::shared_ptr& block) + EXCLUSIVE_LOCKS_REQUIRED(mu_); + + /// Look up a Key in the block cache. + std::shared_ptr Lookup(const Key& key) LOCKS_EXCLUDED(mu_); + + Status MaybeFetch(const Key& key, const std::shared_ptr& block) + LOCKS_EXCLUDED(mu_); + + /// Trim the block cache to make room for another entry. + void Trim() EXCLUSIVE_LOCKS_REQUIRED(mu_); + + /// Update the LRU iterator for the block at `key`. + Status UpdateLRU(const Key& key, const std::shared_ptr& block) + LOCKS_EXCLUDED(mu_); + + /// Remove all blocks of a file, with mu_ already held. + void RemoveFile_Locked(const string& filename) EXCLUSIVE_LOCKS_REQUIRED(mu_); + + /// Remove the block `entry` from the block map and LRU list, and update the + /// cache size accordingly. + void RemoveBlock(BlockMap::iterator entry) EXCLUSIVE_LOCKS_REQUIRED(mu_); + + /// The cache pruning thread that removes files with expired blocks. + std::unique_ptr pruning_thread_; + + /// Notification for stopping the cache pruning thread. + Notification stop_pruning_thread_; + + /// Guards access to the block map, LRU list, and cached byte count. + mutable mutex mu_; + + /// The block map (map from Key to Block). + BlockMap block_map_ GUARDED_BY(mu_); + + /// The LRU list of block keys. The front of the list identifies the most + /// recently accessed block. + std::list lru_list_ GUARDED_BY(mu_); + + /// The LRA (least recently added) list of block keys. The front of the list + /// identifies the most recently added block. + /// + /// Note: blocks are added to lra_list_ only after they have successfully been + /// fetched from the underlying block store. + std::list lra_list_ GUARDED_BY(mu_); + + /// The combined number of bytes in all of the cached blocks. + size_t cache_size_ GUARDED_BY(mu_) = 0; +}; + +} // namespace tensorflow + +#endif // TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_ diff --git a/tensorflow/core/platform/cloud/ram_file_block_cache_test.cc b/tensorflow/core/platform/cloud/ram_file_block_cache_test.cc new file mode 100644 index 0000000000..d555b682a6 --- /dev/null +++ b/tensorflow/core/platform/cloud/ram_file_block_cache_test.cc @@ -0,0 +1,519 @@ +/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include "tensorflow/core/platform/cloud/ram_file_block_cache.h" +#include +#include "tensorflow/core/lib/core/blocking_counter.h" +#include "tensorflow/core/lib/core/status_test_util.h" +#include "tensorflow/core/platform/cloud/now_seconds_env.h" +#include "tensorflow/core/platform/env.h" +#include "tensorflow/core/platform/notification.h" +#include "tensorflow/core/platform/test.h" + +namespace tensorflow { +namespace { + +Status ReadCache(RamFileBlockCache* cache, const string& filename, + size_t offset, size_t n, std::vector* out) { + out->clear(); + out->resize(n, 0); + size_t bytes_transferred = 0; + Status status = + cache->Read(filename, offset, n, out->data(), &bytes_transferred); + EXPECT_LE(bytes_transferred, n); + out->resize(bytes_transferred, n); + return status; +} + +TEST(RamFileBlockCacheTest, PassThrough) { + const string want_filename = "foo/bar"; + const size_t want_offset = 42; + const size_t want_n = 1024; + int calls = 0; + auto fetcher = [&calls, want_filename, want_offset, want_n]( + const string& got_filename, size_t got_offset, + size_t got_n, char* buffer, size_t* bytes_transferred) { + EXPECT_EQ(got_filename, want_filename); + EXPECT_EQ(got_offset, want_offset); + EXPECT_EQ(got_n, want_n); + calls++; + memset(buffer, 'x', got_n); + *bytes_transferred = got_n; + return Status::OK(); + }; + // If block_size, max_bytes, or both are zero, the cache is a pass-through. + RamFileBlockCache cache1(1, 0, 0, fetcher); + RamFileBlockCache cache2(0, 1, 0, fetcher); + RamFileBlockCache cache3(0, 0, 0, fetcher); + std::vector out; + TF_EXPECT_OK(ReadCache(&cache1, want_filename, want_offset, want_n, &out)); + EXPECT_EQ(calls, 1); + TF_EXPECT_OK(ReadCache(&cache2, want_filename, want_offset, want_n, &out)); + EXPECT_EQ(calls, 2); + TF_EXPECT_OK(ReadCache(&cache3, want_filename, want_offset, want_n, &out)); + EXPECT_EQ(calls, 3); +} + +TEST(RamFileBlockCacheTest, BlockAlignment) { + // Initialize a 256-byte buffer. This is the file underlying the reads we'll + // do in this test. + const size_t size = 256; + std::vector buf; + for (int i = 0; i < size; i++) { + buf.push_back(i); + } + // The fetcher just fetches slices of the buffer. + auto fetcher = [&buf](const string& filename, size_t offset, size_t n, + char* buffer, size_t* bytes_transferred) { + if (offset < buf.size()) { + size_t bytes_to_copy = std::min(buf.size() - offset, n); + memcpy(buffer, buf.data() + offset, bytes_to_copy); + *bytes_transferred = bytes_to_copy; + } else { + *bytes_transferred = 0; + } + return Status::OK(); + }; + for (size_t block_size = 2; block_size <= 4; block_size++) { + // Make a cache of N-byte block size (1 block) and verify that reads of + // varying offsets and lengths return correct data. + RamFileBlockCache cache(block_size, block_size, 0, fetcher); + for (size_t offset = 0; offset < 10; offset++) { + for (size_t n = block_size - 2; n <= block_size + 2; n++) { + std::vector got; + TF_EXPECT_OK(ReadCache(&cache, "", offset, n, &got)); + // Verify the size of the read. + if (offset + n <= size) { + // Expect a full read. + EXPECT_EQ(got.size(), n) << "block size = " << block_size + << ", offset = " << offset << ", n = " << n; + } else { + // Expect a partial read. + EXPECT_EQ(got.size(), size - offset) + << "block size = " << block_size << ", offset = " << offset + << ", n = " << n; + } + // Verify the contents of the read. + std::vector::const_iterator begin = buf.begin() + offset; + std::vector::const_iterator end = + offset + n > buf.size() ? buf.end() : begin + n; + std::vector want(begin, end); + EXPECT_EQ(got, want) << "block size = " << block_size + << ", offset = " << offset << ", n = " << n; + } + } + } +} + +TEST(RamFileBlockCacheTest, CacheHits) { + const size_t block_size = 16; + std::set calls; + auto fetcher = [&calls, block_size](const string& filename, size_t offset, + size_t n, char* buffer, + size_t* bytes_transferred) { + EXPECT_EQ(n, block_size); + EXPECT_EQ(offset % block_size, 0); + EXPECT_EQ(calls.find(offset), calls.end()) << "at offset " << offset; + calls.insert(offset); + memset(buffer, 'x', n); + *bytes_transferred = n; + return Status::OK(); + }; + const uint32 block_count = 256; + RamFileBlockCache cache(block_size, block_count * block_size, 0, fetcher); + std::vector out; + out.resize(block_count, 0); + // The cache has space for `block_count` blocks. The loop with i = 0 should + // fill the cache, and the loop with i = 1 should be all cache hits. The + // fetcher checks that it is called once and only once for each offset (to + // fetch the corresponding block). + for (int i = 0; i < 2; i++) { + for (int j = 0; j < block_count; j++) { + TF_EXPECT_OK(ReadCache(&cache, "", block_size * j, block_size, &out)); + } + } +} + +TEST(RamFileBlockCacheTest, OutOfRange) { + // Tests reads of a 24-byte file with block size 16. + const size_t block_size = 16; + const size_t file_size = 24; + bool first_block = false; + bool second_block = false; + auto fetcher = [block_size, file_size, &first_block, &second_block]( + const string& filename, size_t offset, size_t n, + char* buffer, size_t* bytes_transferred) { + EXPECT_EQ(n, block_size); + EXPECT_EQ(offset % block_size, 0); + size_t bytes_to_copy = 0; + if (offset == 0) { + // The first block (16 bytes) of the file. + memset(buffer, 'x', n); + bytes_to_copy = n; + first_block = true; + } else if (offset == block_size) { + // The second block (8 bytes) of the file. + bytes_to_copy = file_size - block_size; + memset(buffer, 'x', bytes_to_copy); + second_block = true; + } + *bytes_transferred = bytes_to_copy; + return Status::OK(); + }; + RamFileBlockCache cache(block_size, block_size, 0, fetcher); + std::vector out; + // Reading the first 16 bytes should be fine. + TF_EXPECT_OK(ReadCache(&cache, "", 0, block_size, &out)); + EXPECT_TRUE(first_block); + EXPECT_EQ(out.size(), block_size); + // Reading at offset file_size + 4 will read the second block (since the read + // at file_size + 4 = 28 will be aligned to an offset of 16) but will return + // OutOfRange because the offset is past the end of the 24-byte file. + Status status = ReadCache(&cache, "", file_size + 4, 4, &out); + EXPECT_EQ(status.code(), error::OUT_OF_RANGE); + EXPECT_TRUE(second_block); + // Reading the second full block will return 8 bytes, from a cache hit. + second_block = false; + TF_EXPECT_OK(ReadCache(&cache, "", block_size, block_size, &out)); + EXPECT_FALSE(second_block); + EXPECT_EQ(out.size(), file_size - block_size); +} + +TEST(RamFileBlockCacheTest, Inconsistent) { + // Tests the detection of interrupted reads leading to partially filled blocks + // where we expected complete blocks. + const size_t block_size = 16; + // This fetcher returns OK but only fills in one byte for any offset. + auto fetcher = [block_size](const string& filename, size_t offset, size_t n, + char* buffer, size_t* bytes_transferred) { + EXPECT_EQ(n, block_size); + EXPECT_EQ(offset % block_size, 0); + EXPECT_GE(n, 1); + memset(buffer, 'x', 1); + *bytes_transferred = 1; + return Status::OK(); + }; + RamFileBlockCache cache(block_size, 2 * block_size, 0, fetcher); + std::vector out; + // Read the second block; this should yield an OK status and a single byte. + TF_EXPECT_OK(ReadCache(&cache, "", block_size, block_size, &out)); + EXPECT_EQ(out.size(), 1); + // Now read the first block; this should yield an INTERNAL error because we + // had already cached a partial block at a later position. + Status status = ReadCache(&cache, "", 0, block_size, &out); + EXPECT_EQ(status.code(), error::INTERNAL); +} + +TEST(RamFileBlockCacheTest, LRU) { + const size_t block_size = 16; + std::list calls; + auto fetcher = [&calls, block_size](const string& filename, size_t offset, + size_t n, char* buffer, + size_t* bytes_transferred) { + EXPECT_EQ(n, block_size); + EXPECT_FALSE(calls.empty()) << "at offset = " << offset; + if (!calls.empty()) { + EXPECT_EQ(offset, calls.front()); + calls.pop_front(); + } + memset(buffer, 'x', n); + *bytes_transferred = n; + return Status::OK(); + }; + const uint32 block_count = 2; + RamFileBlockCache cache(block_size, block_count * block_size, 0, fetcher); + std::vector out; + // Read blocks from the cache, and verify the LRU behavior based on the + // fetcher calls that the cache makes. + calls.push_back(0); + // Cache miss - drains an element from `calls`. + TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out)); + // Cache hit - does not drain an element from `calls`. + TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out)); + calls.push_back(block_size); + // Cache miss followed by cache hit. + TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out)); + TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out)); + calls.push_back(2 * block_size); + // Cache miss followed by cache hit. Causes eviction of LRU element. + TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out)); + TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out)); + // LRU element was at offset 0. Cache miss. + calls.push_back(0); + TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out)); + // Element at 2 * block_size is still in cache, and this read should update + // its position in the LRU list so it doesn't get evicted by the next read. + TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out)); + // Element at block_size was evicted. Reading this element will also cause + // the LRU element (at 0) to be evicted. + calls.push_back(block_size); + TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out)); + // Element at 0 was evicted again. + calls.push_back(0); + TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out)); +} + +TEST(RamFileBlockCacheTest, MaxStaleness) { + int calls = 0; + auto fetcher = [&calls](const string& filename, size_t offset, size_t n, + char* buffer, size_t* bytes_transferred) { + calls++; + memset(buffer, 'x', n); + *bytes_transferred = n; + return Status::OK(); + }; + std::vector out; + std::unique_ptr env(new NowSecondsEnv); + // Create a cache with max staleness of 2 seconds, and verify that it works as + // expected. + RamFileBlockCache cache1(8, 16, 2 /* max staleness */, fetcher, env.get()); + // Execute the first read to load the block. + TF_EXPECT_OK(ReadCache(&cache1, "", 0, 1, &out)); + EXPECT_EQ(calls, 1); + // Now advance the clock one second at a time and redo the read. The call + // count should advance every 3 seconds (i.e. every time the staleness is + // greater than 2). + for (int i = 1; i <= 10; i++) { + env->SetNowSeconds(i + 1); + TF_EXPECT_OK(ReadCache(&cache1, "", 0, 1, &out)); + EXPECT_EQ(calls, 1 + i / 3); + } + // Now create a cache with max staleness of 0, and verify that it also works + // as expected. + calls = 0; + env->SetNowSeconds(0); + RamFileBlockCache cache2(8, 16, 0 /* max staleness */, fetcher, env.get()); + // Execute the first read to load the block. + TF_EXPECT_OK(ReadCache(&cache2, "", 0, 1, &out)); + EXPECT_EQ(calls, 1); + // Advance the clock by a huge amount and verify that the cached block is + // used to satisfy the read. + env->SetNowSeconds(365 * 24 * 60 * 60); // ~1 year, just for fun. + TF_EXPECT_OK(ReadCache(&cache2, "", 0, 1, &out)); + EXPECT_EQ(calls, 1); +} + +TEST(RamFileBlockCacheTest, RemoveFile) { + int calls = 0; + auto fetcher = [&calls](const string& filename, size_t offset, size_t n, + char* buffer, size_t* bytes_transferred) { + calls++; + char c = (filename == "a") ? 'a' : (filename == "b") ? 'b' : 'x'; + if (offset > 0) { + // The first block is lower case and all subsequent blocks are upper case. + c = toupper(c); + } + memset(buffer, c, n); + *bytes_transferred = n; + return Status::OK(); + }; + // This cache has space for 4 blocks; we'll read from two files. + const size_t n = 3; + RamFileBlockCache cache(8, 32, 0, fetcher); + std::vector out; + std::vector a(n, 'a'); + std::vector b(n, 'b'); + std::vector A(n, 'A'); + std::vector B(n, 'B'); + // Fill the cache. + TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out)); + EXPECT_EQ(out, a); + EXPECT_EQ(calls, 1); + TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out)); + EXPECT_EQ(out, A); + EXPECT_EQ(calls, 2); + TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out)); + EXPECT_EQ(out, b); + EXPECT_EQ(calls, 3); + TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out)); + EXPECT_EQ(out, B); + EXPECT_EQ(calls, 4); + // All four blocks should be in the cache now. + TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out)); + EXPECT_EQ(out, a); + TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out)); + EXPECT_EQ(out, A); + TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out)); + EXPECT_EQ(out, b); + TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out)); + EXPECT_EQ(out, B); + EXPECT_EQ(calls, 4); + // Remove the blocks from "a". + cache.RemoveFile("a"); + // Both blocks from "b" should still be there. + TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out)); + EXPECT_EQ(out, b); + TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out)); + EXPECT_EQ(out, B); + EXPECT_EQ(calls, 4); + // The blocks from "a" should not be there. + TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out)); + EXPECT_EQ(out, a); + EXPECT_EQ(calls, 5); + TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out)); + EXPECT_EQ(out, A); + EXPECT_EQ(calls, 6); +} + +TEST(RamFileBlockCacheTest, Prune) { + int calls = 0; + auto fetcher = [&calls](const string& filename, size_t offset, size_t n, + char* buffer, size_t* bytes_transferred) { + calls++; + memset(buffer, 'x', n); + *bytes_transferred = n; + return Status::OK(); + }; + std::vector out; + // Our fake environment is initialized with the current timestamp. + std::unique_ptr env(new NowSecondsEnv); + uint64 now = Env::Default()->NowSeconds(); + env->SetNowSeconds(now); + RamFileBlockCache cache(8, 32, 1 /* max staleness */, fetcher, env.get()); + // Read three blocks into the cache, and advance the timestamp by one second + // with each read. Start with a block of "a" at the current timestamp `now`. + TF_EXPECT_OK(ReadCache(&cache, "a", 0, 1, &out)); + // Now load a block of a different file "b" at timestamp `now` + 1 + env->SetNowSeconds(now + 1); + TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out)); + // Now load a different block of file "a" at timestamp `now` + 1. When the + // first block of "a" expires, this block should also be removed because it + // also belongs to file "a". + TF_EXPECT_OK(ReadCache(&cache, "a", 8, 1, &out)); + // Ensure that all blocks are in the cache (i.e. reads are cache hits). + EXPECT_EQ(cache.CacheSize(), 24); + EXPECT_EQ(calls, 3); + TF_EXPECT_OK(ReadCache(&cache, "a", 0, 1, &out)); + TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out)); + TF_EXPECT_OK(ReadCache(&cache, "a", 8, 1, &out)); + EXPECT_EQ(calls, 3); + // Advance the fake timestamp so that "a" becomes stale via its first block. + env->SetNowSeconds(now + 2); + // The pruning thread periodically compares env->NowSeconds() with the oldest + // block's timestamp to see if it should evict any files. At the current fake + // timestamp of `now` + 2, file "a" is stale because its first block is stale, + // but file "b" is not stale yet. Thus, once the pruning thread wakes up (in + // one second of wall time), it should remove "a" and leave "b" alone. + uint64 start = Env::Default()->NowSeconds(); + do { + Env::Default()->SleepForMicroseconds(100000); + } while (cache.CacheSize() == 24 && Env::Default()->NowSeconds() - start < 3); + // There should be one block left in the cache, and it should be the first + // block of "b". + EXPECT_EQ(cache.CacheSize(), 8); + TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out)); + EXPECT_EQ(calls, 3); + // Advance the fake time to `now` + 3, at which point "b" becomes stale. + env->SetNowSeconds(now + 3); + // Wait for the pruner to remove "b". + start = Env::Default()->NowSeconds(); + do { + Env::Default()->SleepForMicroseconds(100000); + } while (cache.CacheSize() == 8 && Env::Default()->NowSeconds() - start < 3); + // The cache should now be empty. + EXPECT_EQ(cache.CacheSize(), 0); +} + +TEST(RamFileBlockCacheTest, ParallelReads) { + // This fetcher won't respond until either `callers` threads are calling it + // concurrently (at which point it will respond with success to all callers), + // or 10 seconds have elapsed (at which point it will respond with an error). + const int callers = 4; + BlockingCounter counter(callers); + auto fetcher = [&counter](const string& filename, size_t offset, size_t n, + char* buffer, size_t* bytes_transferred) { + counter.DecrementCount(); + if (!counter.WaitFor(std::chrono::seconds(10))) { + // This avoids having the test time out, which is harder to debug. + return errors::FailedPrecondition("desired concurrency not reached"); + } + memset(buffer, 'x', n); + *bytes_transferred = n; + return Status::OK(); + }; + const int block_size = 8; + RamFileBlockCache cache(block_size, 2 * callers * block_size, 0, fetcher); + std::vector> threads; + for (int i = 0; i < callers; i++) { + threads.emplace_back( + Env::Default()->StartThread({}, "caller", [&cache, i, block_size]() { + std::vector out; + TF_EXPECT_OK( + ReadCache(&cache, "a", i * block_size, block_size, &out)); + std::vector x(block_size, 'x'); + EXPECT_EQ(out, x); + })); + } + // The `threads` destructor blocks until the threads can be joined, once their + // respective reads finish (which happens once they are all concurrently being + // executed, or 10 seconds have passed). +} + +TEST(RamFileBlockCacheTest, CoalesceConcurrentReads) { + // Concurrent reads to the same file blocks should be de-duplicated. + const size_t block_size = 16; + int num_requests = 0; + Notification notification; + auto fetcher = [&num_requests, ¬ification, block_size]( + const string& filename, size_t offset, size_t n, + char* buffer, size_t* bytes_transferred) { + EXPECT_EQ(n, block_size); + EXPECT_EQ(offset, 0); + num_requests++; + memset(buffer, 'x', n); + *bytes_transferred = n; + notification.Notify(); + // Wait for other thread to issue read. + Env::Default()->SleepForMicroseconds(100000); // 0.1 secs + return Status::OK(); + }; + RamFileBlockCache cache(block_size, block_size, 0, fetcher); + // Fork off thread for parallel read. + std::unique_ptr concurrent( + Env::Default()->StartThread({}, "concurrent", [&cache, block_size] { + std::vector out; + TF_EXPECT_OK(ReadCache(&cache, "", 0, block_size / 2, &out)); + EXPECT_EQ(out.size(), block_size / 2); + })); + EXPECT_TRUE(WaitForNotificationWithTimeout(¬ification, 10000)) + << "Timeout waiting for concurrent thread to start."; + std::vector out; + TF_EXPECT_OK(ReadCache(&cache, "", block_size / 2, block_size / 2, &out)); + EXPECT_EQ(out.size(), block_size / 2); + + EXPECT_EQ(1, num_requests); +} + +TEST(RamFileBlockCacheTest, Flush) { + int calls = 0; + auto fetcher = [&calls](const string& filename, size_t offset, size_t n, + char* buffer, size_t* bytes_transferred) { + calls++; + memset(buffer, 'x', n); + *bytes_transferred = n; + return Status::OK(); + }; + RamFileBlockCache cache(16, 32, 0, fetcher); + std::vector out; + TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out)); + TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out)); + EXPECT_EQ(calls, 1); + cache.Flush(); + TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out)); + EXPECT_EQ(calls, 2); +} + +} // namespace +} // namespace tensorflow -- cgit v1.2.3