diff options
author | 2017-07-14 20:15:52 -0700 | |
---|---|---|
committer | 2017-07-14 20:19:40 -0700 | |
commit | 35b606b976de752ee822ace4e290e253bb9482ac (patch) | |
tree | 2a0d4a02897e415447c560ef5237415ab144dd99 /tensorflow | |
parent | ca5629f77527ad53745db14ea95b64fffb173cb0 (diff) |
FileBlockCache is now a filesystem-wide shared cache with a configurable maximum size (in total bytes of cached block contents). The default maximum cache size is equal to the block size, and can be overridden by the environment variable GCS_READ_CACHE_MAX_SIZE_MB. In addition, deleting files now flushes them from the block cache, and renaming files flushes both the source and destination files from the block cache.
PiperOrigin-RevId: 162041685
Diffstat (limited to 'tensorflow')
-rw-r--r-- | tensorflow/core/platform/cloud/file_block_cache.cc | 90 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/file_block_cache.h | 78 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/file_block_cache_test.cc | 188 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/gcs_file_system.cc | 129 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/gcs_file_system.h | 38 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/gcs_file_system_test.cc | 500 |
6 files changed, 598 insertions, 425 deletions
diff --git a/tensorflow/core/platform/cloud/file_block_cache.cc b/tensorflow/core/platform/cloud/file_block_cache.cc index 84ef0ea1fe..27e023745e 100644 --- a/tensorflow/core/platform/cloud/file_block_cache.cc +++ b/tensorflow/core/platform/cloud/file_block_cache.cc @@ -20,62 +20,70 @@ limitations under the License. namespace tensorflow { -Status FileBlockCache::Read(uint64 offset, size_t n, std::vector<char>* out) { +Status FileBlockCache::Read(const string& filename, size_t offset, size_t n, + std::vector<char>* out) { out->clear(); if (n == 0) { return Status::OK(); } - if (block_size_ == 0 || block_count_ == 0) { + if (block_size_ == 0 || max_bytes_ == 0) { // The cache is effectively disabled, so we pass the read through to the // fetcher without breaking it up into blocks. - return block_fetcher_(offset, n, out); + return block_fetcher_(filename, offset, n, out); } // Calculate the block-aligned start and end of the read. - uint64 start = block_size_ * (offset / block_size_); - uint64 finish = block_size_ * ((offset + n) / block_size_); + size_t start = block_size_ * (offset / block_size_); + size_t finish = block_size_ * ((offset + n) / block_size_); if (finish < offset + n) { finish += block_size_; } mutex_lock lock(mu_); - // If the oldest block arrived max_staleness_ in the past, flush the cache. - // Note that if max_staleness_ is 0, we don't expire any cached blocks. - if (max_staleness_ > 0 && timestamp_ > 0 && - env_->NowSeconds() - timestamp_ > max_staleness_) { - TrimCache(0); - timestamp_ = 0; - } // Now iterate through the blocks, reading them one at a time. Reads are // locked so that only one block_fetcher call is active at any given time. - for (uint64 pos = start; pos < finish; pos += block_size_) { - auto entry = block_map_.find(pos); + for (size_t pos = start; pos < finish; pos += block_size_) { + Key key = std::make_pair(filename, pos); + auto entry = block_map_.find(key); + // If we're enforcing max staleness and the block is stale, remove all of + // the file's cached blocks so we reload them. + if (entry != block_map_.end() && max_staleness_ > 0 && + env_->NowSeconds() - entry->second->timestamp > max_staleness_) { + RemoveFile_Locked(filename); + entry = block_map_.end(); + } if (entry == block_map_.end()) { // We need to fetch the block from the remote filesystem. Trim the LRU // cache if needed - we do this up front in order to avoid any period of // time during which the cache size exceeds its desired limit. The - // tradeoff is that if the fetcher fails, the cache may evict a block + // tradeoff is that if the fetcher fails, the cache may evict blocks // prematurely. - TrimCache(block_count_ - 1); + while (!lru_list_.empty() && cache_size_ + block_size_ > max_bytes_) { + RemoveBlock(block_map_.find(lru_list_.back())); + } std::unique_ptr<Block> block(new Block); - TF_RETURN_IF_ERROR(block_fetcher_(pos, block_size_, &block->data)); + TF_RETURN_IF_ERROR( + block_fetcher_(filename, pos, block_size_, &block->data)); // Sanity check to detect interrupted reads leading to partial blocks: a // partial block must have a higher key than the highest existing key in - // the block map. - if (block->data.size() < block_size_ && !block_map_.empty() && - pos < block_map_.rbegin()->first) { - // We expected to read a full block at this position. - return errors::FailedPrecondition("File contents are inconsistent"); - } - entry = block_map_.emplace(std::make_pair(pos, std::move(block))).first; - if (timestamp_ == 0) { - // Mark the timestamp of the first block's arrival in the cache. - timestamp_ = env_->NowSeconds(); + // the block map for the file. + if (block->data.size() < block_size_ && !block_map_.empty()) { + Key fmax = std::make_pair(filename, std::numeric_limits<size_t>::max()); + auto fcmp = block_map_.upper_bound(fmax); + if (fcmp != block_map_.begin() && key < (--fcmp)->first) { + // We expected to read a full block at this position. + return errors::Internal("File contents are inconsistent"); + } } + // Record the block timestamp, update the cache size, and add the block to + // the cache. + block->timestamp = env_->NowSeconds(); + cache_size_ += block->data.size(); + entry = block_map_.emplace(std::make_pair(key, std::move(block))).first; } else { // Cache hit. Remove the block from the LRU list at its prior location. lru_list_.erase(entry->second->lru_iterator); } // Push the block to the front of the LRU list. - lru_list_.push_front(pos); + lru_list_.push_front(key); entry->second->lru_iterator = lru_list_.begin(); // Copy the relevant portion of the block into the result buffer. const auto& data = entry->second->data; @@ -98,15 +106,33 @@ Status FileBlockCache::Read(uint64 offset, size_t n, std::vector<char>* out) { if (begin < end) { out->insert(out->end(), begin, end); } + if (data.size() < block_size_) { + // The block was a partial block and thus signals EOF at its upper bound. + break; + } } return Status::OK(); } -void FileBlockCache::TrimCache(size_t size) { - while (lru_list_.size() > size) { - block_map_.erase(lru_list_.back()); - lru_list_.pop_back(); +void FileBlockCache::RemoveFile(const string& filename) { + mutex_lock lock(mu_); + RemoveFile_Locked(filename); +} + +void FileBlockCache::RemoveFile_Locked(const string& filename) { + Key begin = std::make_pair(filename, 0); + auto it = block_map_.lower_bound(begin); + while (it != block_map_.end() && it->first.first == filename) { + auto next = std::next(it); + RemoveBlock(it); + it = next; } } +void FileBlockCache::RemoveBlock(BlockMap::iterator entry) { + lru_list_.erase(entry->second->lru_iterator); + cache_size_ -= entry->second->data.size(); + block_map_.erase(entry); +} + } // namespace tensorflow diff --git a/tensorflow/core/platform/cloud/file_block_cache.h b/tensorflow/core/platform/cloud/file_block_cache.h index 4acc4c7fca..02e00b5bc9 100644 --- a/tensorflow/core/platform/cloud/file_block_cache.h +++ b/tensorflow/core/platform/cloud/file_block_cache.h @@ -31,9 +31,9 @@ limitations under the License. namespace tensorflow { -/// \brief An LRU block cache of file contents. +/// \brief An LRU block cache of file contents, keyed by {filename, offset}. /// -/// This class should be used by read-only random access files on a remote +/// This class should be shared by read-only random access files on a remote /// filesystem (e.g. GCS). class FileBlockCache { public: @@ -42,18 +42,20 @@ class FileBlockCache { /// cache is constructed. The returned Status should be OK as long as the /// read from the remote filesystem succeeded (similar to the semantics of the /// read(2) system call). - typedef std::function<Status(uint64, size_t, std::vector<char>*)> + typedef std::function<Status(const string&, size_t, size_t, + std::vector<char>*)> BlockFetcher; - FileBlockCache(uint64 block_size, uint32 block_count, uint64 max_staleness, + FileBlockCache(size_t block_size, size_t max_bytes, uint64 max_staleness, BlockFetcher block_fetcher, Env* env = Env::Default()) : block_size_(block_size), - block_count_(block_count), + max_bytes_(max_bytes), max_staleness_(max_staleness), block_fetcher_(block_fetcher), env_(env) {} - /// Read `n` bytes starting at `offset` into `out`. This method will return: + /// Read `n` bytes from `filename` starting at `offset` into `out`. This + /// method will return: /// /// 1) The error from the remote filesystem, if the read from the remote /// filesystem failed. @@ -66,17 +68,23 @@ class FileBlockCache { /// placed in `out`. /// 4) OK otherwise (i.e. the read succeeded, and at least one byte was placed /// in `out`). - Status Read(uint64 offset, size_t n, std::vector<char>* out); + Status Read(const string& filename, size_t offset, size_t n, + std::vector<char>* out); - private: - /// Trim the LRU cache until its size is at most `size` blocks. - void TrimCache(size_t size) EXCLUSIVE_LOCKS_REQUIRED(mu_); + /// Remove all cached blocks for `filename`. + void RemoveFile(const string& filename) LOCKS_EXCLUDED(mu_); + + /// Accessors for cache parameters. + size_t block_size() const { return block_size_; } + size_t max_bytes() const { return max_bytes_; } + uint64 max_staleness() const { return max_staleness_; } + private: /// The size of the blocks stored in the LRU cache, as well as the size of the /// reads from the underlying filesystem. - const uint64 block_size_; - /// The maximum number of blocks allowed in the LRU cache. - const uint32 block_count_; + const size_t block_size_; + /// The maximum number of bytes (sum of block sizes) allowed in the LRU cache. + const size_t max_bytes_; /// The maximum staleness of any block in the LRU cache, in seconds. const uint64 max_staleness_; /// The callback to read a block from the underlying filesystem. @@ -84,31 +92,49 @@ class FileBlockCache { /// The Env from which we read timestamps. Env* const env_; // not owned + /// \brief The key type for the file block cache. + /// + /// The file block cache key is a {filename, offset} pair. + typedef std::pair<string, size_t> Key; + /// \brief A block of a file. /// - /// A file block consists of the block data and the block's current position - /// in the LRU cache. + /// A file block consists of the block data, the block's current position in + /// the LRU cache, and the timestamp (seconds since epoch) at which the block + /// was cached. struct Block { /// The block data. std::vector<char> data; /// A list iterator pointing to the block's position in the LRU list. - std::list<uint64>::iterator lru_iterator; + std::list<Key>::iterator lru_iterator; + /// The timestamp (seconds since epoch) at which the block was cached. + uint64 timestamp; }; - /// Guards access to the block map, LRU list, and cache timestamp. + /// \brief The block map type for the file block cache. + /// + /// The block map is an ordered map from Key to Block. + typedef std::map<Key, std::unique_ptr<Block>> BlockMap; + + /// Remove all blocks of a file, with mu_ already held. + void RemoveFile_Locked(const string& filename) EXCLUSIVE_LOCKS_REQUIRED(mu_); + + /// Remove the block `entry` from the block map and LRU list, and update the + /// cache size accordingly. + void RemoveBlock(BlockMap::iterator entry) EXCLUSIVE_LOCKS_REQUIRED(mu_); + + /// Guards access to the block map, LRU list, and cached byte count. mutex mu_; - /// The block map (map from offset in the file to Block object). - std::map<uint64, std::unique_ptr<Block>> block_map_ GUARDED_BY(mu_); + /// The block map (map from Key to Block). + BlockMap block_map_ GUARDED_BY(mu_); - /// The LRU list of offsets in the file. The front of the list is the position - /// of the most recently accessed block. - std::list<uint64> lru_list_ GUARDED_BY(mu_); + /// The LRU list of block keys. The front of the list identifies the most + /// recently accessed block. + std::list<Key> lru_list_ GUARDED_BY(mu_); - /// The most recent timestamp (in seconds since epoch) at which the block map - /// transitioned from empty to non-empty. A value of 0 means the block map is - /// currently empty. - uint64 timestamp_ GUARDED_BY(mu_) = 0; + /// The combined number of bytes in all of the cached blocks. + size_t cache_size_ GUARDED_BY(mu_) = 0; }; } // namespace tensorflow diff --git a/tensorflow/core/platform/cloud/file_block_cache_test.cc b/tensorflow/core/platform/cloud/file_block_cache_test.cc index 8ff02ae00b..3f53cbc6ea 100644 --- a/tensorflow/core/platform/cloud/file_block_cache_test.cc +++ b/tensorflow/core/platform/cloud/file_block_cache_test.cc @@ -23,27 +23,30 @@ namespace tensorflow { namespace { TEST(FileBlockCacheTest, PassThrough) { - const uint64 want_offset = 42; + const string want_filename = "foo/bar"; + const size_t want_offset = 42; const size_t want_n = 1024; int calls = 0; - auto fetcher = [&calls, want_offset, want_n](uint64 got_offset, size_t got_n, - std::vector<char>* out) { + auto fetcher = [&calls, want_filename, want_offset, want_n]( + const string& got_filename, size_t got_offset, + size_t got_n, std::vector<char>* out) { + EXPECT_EQ(got_filename, want_filename); EXPECT_EQ(got_offset, want_offset); EXPECT_EQ(got_n, want_n); calls++; out->resize(got_n, 'x'); return Status::OK(); }; - // If block_size, block_count, or both are zero, the cache is a pass-through. + // If block_size, max_bytes, or both are zero, the cache is a pass-through. FileBlockCache cache1(1, 0, 0, fetcher); FileBlockCache cache2(0, 1, 0, fetcher); FileBlockCache cache3(0, 0, 0, fetcher); std::vector<char> out; - TF_EXPECT_OK(cache1.Read(want_offset, want_n, &out)); + TF_EXPECT_OK(cache1.Read(want_filename, want_offset, want_n, &out)); EXPECT_EQ(calls, 1); - TF_EXPECT_OK(cache2.Read(want_offset, want_n, &out)); + TF_EXPECT_OK(cache2.Read(want_filename, want_offset, want_n, &out)); EXPECT_EQ(calls, 2); - TF_EXPECT_OK(cache3.Read(want_offset, want_n, &out)); + TF_EXPECT_OK(cache3.Read(want_filename, want_offset, want_n, &out)); EXPECT_EQ(calls, 3); } @@ -56,7 +59,8 @@ TEST(FileBlockCacheTest, BlockAlignment) { buf.push_back(i); } // The fetcher just fetches slices of the buffer. - auto fetcher = [&buf](uint64 offset, size_t n, std::vector<char>* out) { + auto fetcher = [&buf](const string& filename, size_t offset, size_t n, + std::vector<char>* out) { if (offset < buf.size()) { if (offset + n > buf.size()) { out->insert(out->end(), buf.begin() + offset, buf.end()); @@ -66,14 +70,14 @@ TEST(FileBlockCacheTest, BlockAlignment) { } return Status::OK(); }; - for (uint64_t block_size = 2; block_size <= 4; block_size++) { + for (size_t block_size = 2; block_size <= 4; block_size++) { // Make a cache of N-byte block size (1 block) and verify that reads of // varying offsets and lengths return correct data. - FileBlockCache cache(block_size, 1, 0, fetcher); - for (uint64_t offset = 0; offset < 10; offset++) { + FileBlockCache cache(block_size, block_size, 0, fetcher); + for (size_t offset = 0; offset < 10; offset++) { for (size_t n = block_size - 2; n <= block_size + 2; n++) { std::vector<char> got; - TF_EXPECT_OK(cache.Read(offset, n, &got)); + TF_EXPECT_OK(cache.Read("", offset, n, &got)); // Verify the size of the read. if (offset + n <= size) { // Expect a full read. @@ -98,10 +102,10 @@ TEST(FileBlockCacheTest, BlockAlignment) { } TEST(FileBlockCacheTest, CacheHits) { - const uint64 block_size = 16; - std::set<uint64_t> calls; - auto fetcher = [&calls, block_size](uint64 offset, size_t n, - std::vector<char>* out) { + const size_t block_size = 16; + std::set<size_t> calls; + auto fetcher = [&calls, block_size](const string& filename, size_t offset, + size_t n, std::vector<char>* out) { EXPECT_EQ(n, block_size); EXPECT_EQ(offset % block_size, 0); EXPECT_EQ(calls.find(offset), calls.end()) << "at offset " << offset; @@ -110,7 +114,7 @@ TEST(FileBlockCacheTest, CacheHits) { return Status::OK(); }; const uint32 block_count = 256; - FileBlockCache cache(block_size, block_count, 0, fetcher); + FileBlockCache cache(block_size, block_count * block_size, 0, fetcher); std::vector<char> out; // The cache has space for `block_count` blocks. The loop with i = 0 should // fill the cache, and the loop with i = 1 should be all cache hits. The @@ -118,19 +122,20 @@ TEST(FileBlockCacheTest, CacheHits) { // fetch the corresponding block). for (int i = 0; i < 2; i++) { for (int j = 0; j < block_count; j++) { - TF_EXPECT_OK(cache.Read(block_size * j, block_size, &out)); + TF_EXPECT_OK(cache.Read("", block_size * j, block_size, &out)); } } } TEST(FileBlockCacheTest, OutOfRange) { // Tests reads of a 24-byte file with block size 16. - const uint64 block_size = 16; - const uint64 file_size = 24; + const size_t block_size = 16; + const size_t file_size = 24; bool first_block = false; bool second_block = false; auto fetcher = [block_size, file_size, &first_block, &second_block]( - uint64 offset, size_t n, std::vector<char>* out) { + const string& filename, size_t offset, size_t n, + std::vector<char>* out) { EXPECT_EQ(n, block_size); EXPECT_EQ(offset % block_size, 0); if (offset == 0) { @@ -144,22 +149,22 @@ TEST(FileBlockCacheTest, OutOfRange) { } return Status::OK(); }; - FileBlockCache cache(block_size, 1, 0, fetcher); + FileBlockCache cache(block_size, block_size, 0, fetcher); std::vector<char> out; // Reading the first 16 bytes should be fine. - TF_EXPECT_OK(cache.Read(0, block_size, &out)); + TF_EXPECT_OK(cache.Read("", 0, block_size, &out)); EXPECT_TRUE(first_block); EXPECT_EQ(out.size(), block_size); // Reading at offset file_size + 4 will read the second block (since the read // at file_size + 4 = 28 will be aligned to an offset of 16) but will return // OutOfRange because the offset is past the end of the 24-byte file. - Status status = cache.Read(file_size + 4, 4, &out); + Status status = cache.Read("", file_size + 4, 4, &out); EXPECT_EQ(status.code(), error::OUT_OF_RANGE); EXPECT_TRUE(second_block); EXPECT_EQ(out.size(), 0); // Reading the second full block will return 8 bytes, from a cache hit. second_block = false; - TF_EXPECT_OK(cache.Read(block_size, block_size, &out)); + TF_EXPECT_OK(cache.Read("", block_size, block_size, &out)); EXPECT_FALSE(second_block); EXPECT_EQ(out.size(), file_size - block_size); } @@ -167,30 +172,31 @@ TEST(FileBlockCacheTest, OutOfRange) { TEST(FileBlockCacheTest, Inconsistent) { // Tests the detection of interrupted reads leading to partially filled blocks // where we expected complete blocks. - const uint64 block_size = 16; + const size_t block_size = 16; // This fetcher returns OK but only fills in one byte for any offset. - auto fetcher = [block_size](uint64 offset, size_t n, std::vector<char>* out) { + auto fetcher = [block_size](const string& filename, size_t offset, size_t n, + std::vector<char>* out) { EXPECT_EQ(n, block_size); EXPECT_EQ(offset % block_size, 0); out->resize(1, 'x'); return Status::OK(); }; - FileBlockCache cache(block_size, 2, 0, fetcher); + FileBlockCache cache(block_size, 2 * block_size, 0, fetcher); std::vector<char> out; // Read the second block; this should yield an OK status and a single byte. - TF_EXPECT_OK(cache.Read(block_size, block_size, &out)); + TF_EXPECT_OK(cache.Read("", block_size, block_size, &out)); EXPECT_EQ(out.size(), 1); - // Now read the first block; this should yield FAILED_PRECONDITION because we + // Now read the first block; this should yield an INTERNAL error because we // had already cached a partial block at a later position. - Status status = cache.Read(0, block_size, &out); - EXPECT_EQ(status.code(), error::FAILED_PRECONDITION); + Status status = cache.Read("", 0, block_size, &out); + EXPECT_EQ(status.code(), error::INTERNAL); } TEST(FileBlockCacheTest, LRU) { - const uint64 block_size = 16; - std::list<uint64_t> calls; - auto fetcher = [&calls, block_size](uint64 offset, size_t n, - std::vector<char>* out) { + const size_t block_size = 16; + std::list<size_t> calls; + auto fetcher = [&calls, block_size](const string& filename, size_t offset, + size_t n, std::vector<char>* out) { EXPECT_EQ(n, block_size); EXPECT_FALSE(calls.empty()) << "at offset = " << offset; if (!calls.empty()) { @@ -201,36 +207,36 @@ TEST(FileBlockCacheTest, LRU) { return Status::OK(); }; const uint32 block_count = 2; - FileBlockCache cache(block_size, block_count, 0, fetcher); + FileBlockCache cache(block_size, block_count * block_size, 0, fetcher); std::vector<char> out; // Read blocks from the cache, and verify the LRU behavior based on the // fetcher calls that the cache makes. calls.push_back(0); // Cache miss - drains an element from `calls`. - TF_EXPECT_OK(cache.Read(0, 1, &out)); + TF_EXPECT_OK(cache.Read("", 0, 1, &out)); // Cache hit - does not drain an element from `calls`. - TF_EXPECT_OK(cache.Read(0, 1, &out)); + TF_EXPECT_OK(cache.Read("", 0, 1, &out)); calls.push_back(block_size); // Cache miss followed by cache hit. - TF_EXPECT_OK(cache.Read(block_size, 1, &out)); - TF_EXPECT_OK(cache.Read(block_size, 1, &out)); + TF_EXPECT_OK(cache.Read("", block_size, 1, &out)); + TF_EXPECT_OK(cache.Read("", block_size, 1, &out)); calls.push_back(2 * block_size); // Cache miss followed by cache hit. Causes eviction of LRU element. - TF_EXPECT_OK(cache.Read(2 * block_size, 1, &out)); - TF_EXPECT_OK(cache.Read(2 * block_size, 1, &out)); + TF_EXPECT_OK(cache.Read("", 2 * block_size, 1, &out)); + TF_EXPECT_OK(cache.Read("", 2 * block_size, 1, &out)); // LRU element was at offset 0. Cache miss. calls.push_back(0); - TF_EXPECT_OK(cache.Read(0, 1, &out)); + TF_EXPECT_OK(cache.Read("", 0, 1, &out)); // Element at 2 * block_size is still in cache, and this read should update // its position in the LRU list so it doesn't get evicted by the next read. - TF_EXPECT_OK(cache.Read(2 * block_size, 1, &out)); + TF_EXPECT_OK(cache.Read("", 2 * block_size, 1, &out)); // Element at block_size was evicted. Reading this element will also cause // the LRU element (at 0) to be evicted. calls.push_back(block_size); - TF_EXPECT_OK(cache.Read(block_size, 1, &out)); + TF_EXPECT_OK(cache.Read("", block_size, 1, &out)); // Element at 0 was evicted again. calls.push_back(0); - TF_EXPECT_OK(cache.Read(0, 1, &out)); + TF_EXPECT_OK(cache.Read("", 0, 1, &out)); } TEST(FileBlockCacheTest, MaxStaleness) { @@ -242,25 +248,103 @@ TEST(FileBlockCacheTest, MaxStaleness) { uint64 now_ = 1; }; int calls = 0; - auto fetcher = [&calls](uint64 offset, size_t n, std::vector<char>* out) { + auto fetcher = [&calls](const string& filename, size_t offset, size_t n, + std::vector<char>* out) { calls++; out->resize(n, 'x'); return Status::OK(); }; - std::unique_ptr<FakeEnv> env(new FakeEnv); - FileBlockCache cache(8, 1, 2 /* max staleness */, fetcher, env.get()); std::vector<char> out; + std::unique_ptr<FakeEnv> env(new FakeEnv); + // Create a cache with max staleness of 2 seconds, and verify that it works as + // expected. + FileBlockCache cache1(8, 16, 2 /* max staleness */, fetcher, env.get()); // Execute the first read to load the block. - TF_EXPECT_OK(cache.Read(0, 1, &out)); + TF_EXPECT_OK(cache1.Read("", 0, 1, &out)); EXPECT_EQ(calls, 1); // Now advance the clock one second at a time and redo the read. The call // count should advance every 3 seconds (i.e. every time the staleness is // greater than 2). for (int i = 1; i <= 10; i++) { env->now_ = i + 1; - TF_EXPECT_OK(cache.Read(0, 1, &out)); + TF_EXPECT_OK(cache1.Read("", 0, 1, &out)); EXPECT_EQ(calls, 1 + i / 3); } + // Now create a cache with max staleness of 0, and verify that it also works + // as expected. + calls = 0; + env->now_ = 0; + FileBlockCache cache2(8, 16, 0 /* max staleness */, fetcher, env.get()); + // Execute the first read to load the block. + TF_EXPECT_OK(cache2.Read("", 0, 1, &out)); + EXPECT_EQ(calls, 1); + // Advance the clock by a huge amount and verify that the cached block is + // used to satisfy the read. + env->now_ = 365 * 24 * 60 * 60; // ~1 year, just for fun. + TF_EXPECT_OK(cache2.Read("", 0, 1, &out)); + EXPECT_EQ(calls, 1); +} + +TEST(FileBlockCacheTest, RemoveFile) { + int calls = 0; + auto fetcher = [&calls](const string& filename, size_t offset, size_t n, + std::vector<char>* out) { + calls++; + char c = (filename == "a") ? 'a' : (filename == "b") ? 'b' : 'x'; + if (offset > 0) { + // The first block is lower case and all subsequent blocks are upper case. + c = toupper(c); + } + out->clear(); + out->resize(n, c); + return Status::OK(); + }; + // This cache has space for 4 blocks; we'll read from two files. + const size_t n = 3; + FileBlockCache cache(8, 32, 0, fetcher); + std::vector<char> out; + std::vector<char> a(n, 'a'); + std::vector<char> b(n, 'b'); + std::vector<char> A(n, 'A'); + std::vector<char> B(n, 'B'); + // Fill the cache. + TF_EXPECT_OK(cache.Read("a", 0, n, &out)); + EXPECT_EQ(out, a); + EXPECT_EQ(calls, 1); + TF_EXPECT_OK(cache.Read("a", 8, n, &out)); + EXPECT_EQ(out, A); + EXPECT_EQ(calls, 2); + TF_EXPECT_OK(cache.Read("b", 0, n, &out)); + EXPECT_EQ(out, b); + EXPECT_EQ(calls, 3); + TF_EXPECT_OK(cache.Read("b", 8, n, &out)); + EXPECT_EQ(out, B); + EXPECT_EQ(calls, 4); + // All four blocks should be in the cache now. + TF_EXPECT_OK(cache.Read("a", 0, n, &out)); + EXPECT_EQ(out, a); + TF_EXPECT_OK(cache.Read("a", 8, n, &out)); + EXPECT_EQ(out, A); + TF_EXPECT_OK(cache.Read("b", 0, n, &out)); + EXPECT_EQ(out, b); + TF_EXPECT_OK(cache.Read("b", 8, n, &out)); + EXPECT_EQ(out, B); + EXPECT_EQ(calls, 4); + // Remove the blocks from "a". + cache.RemoveFile("a"); + // Both blocks from "b" should still be there. + TF_EXPECT_OK(cache.Read("b", 0, n, &out)); + EXPECT_EQ(out, b); + TF_EXPECT_OK(cache.Read("b", 8, n, &out)); + EXPECT_EQ(out, B); + EXPECT_EQ(calls, 4); + // The blocks from "a" should not be there. + TF_EXPECT_OK(cache.Read("a", 0, n, &out)); + EXPECT_EQ(out, a); + EXPECT_EQ(calls, 5); + TF_EXPECT_OK(cache.Read("a", 8, n, &out)); + EXPECT_EQ(out, A); + EXPECT_EQ(calls, 6); } } // namespace diff --git a/tensorflow/core/platform/cloud/gcs_file_system.cc b/tensorflow/core/platform/cloud/gcs_file_system.cc index 85def60a1e..f1b5431108 100644 --- a/tensorflow/core/platform/cloud/gcs_file_system.cc +++ b/tensorflow/core/platform/cloud/gcs_file_system.cc @@ -59,13 +59,16 @@ constexpr char kReadaheadBufferSize[] = "GCS_READAHEAD_BUFFER_SIZE_BYTES"; // The environment variable that overrides the block size for aligned reads from // GCS. Specified in MB (e.g. "16" = 16 x 1024 x 1024 = 16777216 bytes). constexpr char kBlockSize[] = "GCS_READ_CACHE_BLOCK_SIZE_MB"; -// The environment variable that overrides the block count in the LRU cache of -// blocks read from GCS. -constexpr char kBlockCount[] = "GCS_READ_CACHE_BLOCK_COUNT"; +constexpr size_t kDefaultBlockSize = 256 * 1024 * 1024; +// The environment variable that overrides the max size of the LRU cache of +// blocks read from GCS. Specified in MB. +constexpr char kMaxCacheSize[] = "GCS_READ_CACHE_MAX_SIZE_MB"; +constexpr size_t kDefaultMaxCacheSize = kDefaultBlockSize; // The environment variable that overrides the maximum staleness of cached file // contents. Once any block of a file reaches this staleness, all cached blocks // will be evicted on the next read. constexpr char kMaxStaleness[] = "GCS_READ_CACHE_MAX_STALENESS"; +constexpr uint64 kDefaultMaxStaleness = 0; // The file statistics returned by Stat() for directories. const FileStatistics DIRECTORY_STAT(0, 0, true); @@ -220,16 +223,15 @@ Status GetBoolValue(const Json::Value& parent, const string& name, /// A GCS-based implementation of a random access file with an LRU block cache. class GcsRandomAccessFile : public RandomAccessFile { public: - explicit GcsRandomAccessFile( - const std::shared_ptr<FileBlockCache>& file_block_cache) - : file_block_cache_(file_block_cache) {} + GcsRandomAccessFile(const string& filename, FileBlockCache* file_block_cache) + : filename_(filename), file_block_cache_(file_block_cache) {} /// The implementation of reads with an LRU block cache. Thread safe. Status Read(uint64 offset, size_t n, StringPiece* result, char* scratch) const override { result->clear(); std::vector<char> out; - TF_RETURN_IF_ERROR(file_block_cache_->Read(offset, n, &out)); + TF_RETURN_IF_ERROR(file_block_cache_->Read(filename_, offset, n, &out)); std::memcpy(scratch, out.data(), std::min(out.size(), n)); *result = StringPiece(scratch, std::min(out.size(), n)); if (result->size() < n) { @@ -243,8 +245,10 @@ class GcsRandomAccessFile : public RandomAccessFile { } private: + /// The filename of this file. + const string filename_; /// The LRU block cache for this file. - mutable std::shared_ptr<FileBlockCache> file_block_cache_; + mutable FileBlockCache* file_block_cache_; // not owned }; /// \brief GCS-based implementation of a writeable file. @@ -557,76 +561,64 @@ bool GetEnvVar(const char* varname, bool (*convert)(StringPiece, T*), GcsFileSystem::GcsFileSystem() : auth_provider_(new GoogleAuthProvider()), http_request_factory_(new HttpRequest::Factory()) { + uint64 value; + size_t block_size = kDefaultBlockSize; + size_t max_bytes = kDefaultMaxCacheSize; + uint64 max_staleness = kDefaultMaxStaleness; // Apply the sys env override for the readahead buffer size if it's provided. - uint64 v64; - if (GetEnvVar(kReadaheadBufferSize, strings::safe_strtou64, &v64)) { - block_size_ = v64; + if (GetEnvVar(kReadaheadBufferSize, strings::safe_strtou64, &value)) { + block_size = value; } - // Apply the override for the block size if provided. This takes precedence - // over the readahead buffer size. - if (GetEnvVar(kBlockSize, strings::safe_strtou64, &v64)) { - block_size_ = v64 * 1024 * 1024; + // Apply the overrides for the block size (MB), max bytes (MB), and max + // staleness (seconds) if provided. + if (GetEnvVar(kBlockSize, strings::safe_strtou64, &value)) { + block_size = value * 1024 * 1024; } - // Apply the override for the block count if provided. - uint32 v32; - if (GetEnvVar(kBlockCount, strings::safe_strtou32, &v32)) { - block_count_ = v32; + if (GetEnvVar(kMaxCacheSize, strings::safe_strtou64, &value)) { + max_bytes = value * 1024 * 1024; } - // Apply the override for max staleness if provided. - if (GetEnvVar(kMaxStaleness, strings::safe_strtou64, &v64)) { - max_staleness_ = v64; + if (GetEnvVar(kMaxStaleness, strings::safe_strtou64, &value)) { + max_staleness = value; } + file_block_cache_ = MakeFileBlockCache(block_size, max_bytes, max_staleness); } GcsFileSystem::GcsFileSystem( std::unique_ptr<AuthProvider> auth_provider, std::unique_ptr<HttpRequest::Factory> http_request_factory, - size_t block_size, uint32 block_count, uint64 max_staleness, + size_t block_size, size_t max_bytes, uint64 max_staleness, int64 initial_retry_delay_usec) : auth_provider_(std::move(auth_provider)), http_request_factory_(std::move(http_request_factory)), - block_size_(block_size), - block_count_(block_count), - max_staleness_(max_staleness), + file_block_cache_( + MakeFileBlockCache(block_size, max_bytes, max_staleness)), initial_retry_delay_usec_(initial_retry_delay_usec) {} Status GcsFileSystem::NewRandomAccessFile( const string& fname, std::unique_ptr<RandomAccessFile>* result) { string bucket, object; TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object)); - // `file_cache_` is a container of FileBlockCache, keyed by filename. We look - // up the filename in this container to see if we have a FileBlockCache for - // it. If the FileBlockCache for `fname` exists in file_cache_, we return it. - // Otherwise, we create a new FileBlockCache with a block fetcher that calls - // GcsFileSystem::LoadBufferFromGCS for the bucket and object derived from - // `fname`. If a FileBlockCache is created, it is added to `file_cache_` only - // if `max_staleness_` > 0 (indicating that new random acesss files will - // tolerate stale reads coming from FileBlockCache instances that persist - // across file close/open boundaries). - std::shared_ptr<FileBlockCache> file_block_cache; - mutex_lock lock(mu_); - auto entry = file_cache_.find(fname); - if (entry == file_cache_.end()) { - file_block_cache.reset(new FileBlockCache( - block_size_, block_count_, max_staleness_, - [this, bucket, object](uint64 offset, size_t n, - std::vector<char>* out) { - return LoadBufferFromGCS(bucket, object, offset, n, out); - })); - if (max_staleness_ > 0) { - file_cache_[fname] = file_block_cache; - } - } else { - file_block_cache = entry->second; - } - result->reset(new GcsRandomAccessFile(file_block_cache)); + result->reset(new GcsRandomAccessFile(fname, file_block_cache_.get())); return Status::OK(); } +// A helper function to build a FileBlockCache for GcsFileSystem. +std::unique_ptr<FileBlockCache> GcsFileSystem::MakeFileBlockCache( + size_t block_size, size_t max_bytes, uint64 max_staleness) { + std::unique_ptr<FileBlockCache> file_block_cache( + new FileBlockCache(block_size, max_bytes, max_staleness, + [this](const string& filename, size_t offset, size_t n, + std::vector<char>* out) { + return LoadBufferFromGCS(filename, offset, n, out); + })); + return file_block_cache; +} + // A helper function to actually read the data from GCS. -Status GcsFileSystem::LoadBufferFromGCS(const string& bucket, - const string& object, uint64_t offset, +Status GcsFileSystem::LoadBufferFromGCS(const string& filename, size_t offset, size_t n, std::vector<char>* out) { + string bucket, object; + TF_RETURN_IF_ERROR(ParseGcsPath(filename, false, &bucket, &object)); string auth_token; TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_.get(), &auth_token)); @@ -647,13 +639,10 @@ Status GcsFileSystem::NewWritableFile(const string& fname, std::unique_ptr<WritableFile>* result) { string bucket, object; TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object)); - result->reset(new GcsWritableFile(bucket, object, auth_provider_.get(), - http_request_factory_.get(), - [this, fname]() { - mutex_lock lock(mu_); - file_cache_.erase(fname); - }, - initial_retry_delay_usec_)); + result->reset(new GcsWritableFile( + bucket, object, auth_provider_.get(), http_request_factory_.get(), + [this, fname]() { file_block_cache_->RemoveFile(fname); }, + initial_retry_delay_usec_)); return Status::OK(); } @@ -691,14 +680,11 @@ Status GcsFileSystem::NewAppendableFile(const string& fname, // Create a writable file and pass the old content to it. string bucket, object; TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object)); - result->reset(new GcsWritableFile(bucket, object, auth_provider_.get(), - old_content_filename, - http_request_factory_.get(), - [this, fname]() { - mutex_lock lock(mu_); - file_cache_.erase(fname); - }, - initial_retry_delay_usec_)); + result->reset(new GcsWritableFile( + bucket, object, auth_provider_.get(), old_content_filename, + http_request_factory_.get(), + [this, fname]() { file_block_cache_->RemoveFile(fname); }, + initial_retry_delay_usec_)); return Status::OK(); } @@ -1040,6 +1026,7 @@ Status GcsFileSystem::DeleteFile(const string& fname) { TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token)); TF_RETURN_IF_ERROR(request->SetDeleteRequest()); TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when deleting ", fname); + file_block_cache_->RemoveFile(fname); return Status::OK(); } @@ -1135,7 +1122,9 @@ Status GcsFileSystem::RenameObject(const string& src, const string& target) { TF_RETURN_IF_ERROR(request->SetResultBuffer(&output_buffer)); TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when renaming ", src, " to ", target); - + // Flush the target from the block cache. The source will be flushed in the + // DeleteFile call below. + file_block_cache_->RemoveFile(target); Json::Value root; StringPiece response_piece = StringPiece(output_buffer.data(), output_buffer.size()); diff --git a/tensorflow/core/platform/cloud/gcs_file_system.h b/tensorflow/core/platform/cloud/gcs_file_system.h index 6030b16aa0..9b284722e1 100644 --- a/tensorflow/core/platform/cloud/gcs_file_system.h +++ b/tensorflow/core/platform/cloud/gcs_file_system.h @@ -36,7 +36,7 @@ class GcsFileSystem : public FileSystem { GcsFileSystem(); GcsFileSystem(std::unique_ptr<AuthProvider> auth_provider, std::unique_ptr<HttpRequest::Factory> http_request_factory, - size_t block_size, uint32 block_count, uint64 max_staleness, + size_t block_size, size_t max_bytes, uint64 max_staleness, int64 initial_retry_delay_usec); Status NewRandomAccessFile( @@ -76,9 +76,9 @@ class GcsFileSystem : public FileSystem { Status DeleteRecursively(const string& dirname, int64* undeleted_files, int64* undeleted_dirs) override; - size_t block_size() const { return block_size_; } - uint32 block_count() const { return block_count_; } - uint64 max_staleness() const { return max_staleness_; } + size_t block_size() const { return file_block_cache_->block_size(); } + size_t max_bytes() const { return file_block_cache_->max_bytes(); } + uint64 max_staleness() const { return file_block_cache_->max_staleness(); } private: /// \brief Checks if the bucket exists. Returns OK if the check succeeded. @@ -112,31 +112,17 @@ class GcsFileSystem : public FileSystem { FileStatistics* stat); Status RenameObject(const string& src, const string& target); - /// Loads file contents from GCS for a given bucket and object. - Status LoadBufferFromGCS(const string& bucket, const string& object, - uint64_t offset, size_t n, std::vector<char>* out); + std::unique_ptr<FileBlockCache> MakeFileBlockCache(size_t block_size, + size_t max_bytes, + uint64 max_staleness); + + /// Loads file contents from GCS for a given filename, offset, and length. + Status LoadBufferFromGCS(const string& filename, size_t offset, size_t n, + std::vector<char>* out); std::unique_ptr<AuthProvider> auth_provider_; std::unique_ptr<HttpRequest::Factory> http_request_factory_; - - /// Guards access to the file cache. - mutex mu_; - - /// Cache from filename to FileBlockCache. Populated iff max_staleness_ > 0. - std::map<string, std::shared_ptr<FileBlockCache>> file_cache_ GUARDED_BY(mu_); - - /// The block size for block-aligned reads in the RandomAccessFile - /// implementation. Defaults to 256Mb. - size_t block_size_ = 256 * 1024 * 1024; - - /// The block count for the LRU cache of blocks in the RandomAccessFile - /// implementation. Defaults to 1. - uint32 block_count_ = 1; - - /// The maximum staleness of cached file blocks across close/open boundaries. - /// Defaults to 0, meaning that file blocks do not persist across close/open - /// boundaries. - uint64 max_staleness_ = 0; + std::unique_ptr<FileBlockCache> file_block_cache_; /// The initial delay for exponential backoffs when retrying failed calls. const int64 initial_retry_delay_usec_ = 1000000L; diff --git a/tensorflow/core/platform/cloud/gcs_file_system_test.cc b/tensorflow/core/platform/cloud/gcs_file_system_test.cc index 915a44ad7f..ba08c7414f 100644 --- a/tensorflow/core/platform/cloud/gcs_file_system_test.cc +++ b/tensorflow/core/platform/cloud/gcs_file_system_test.cc @@ -45,8 +45,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_NoBlockCache) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::unique_ptr<RandomAccessFile> file; TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file)); @@ -80,8 +80,8 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_NoBlockCache_differentN) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::unique_ptr<RandomAccessFile> file; TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file)); @@ -130,7 +130,7 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 9 /* block size */, 2 /* block count */, + 9 /* block size */, 18 /* max bytes */, 0 /* max staleness */, 0 /* initial retry delay */); char scratch[100]; @@ -196,7 +196,7 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache_MaxStaleness) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 8 /* block size */, 2 /* block count */, + 8 /* block size */, 16 /* max bytes */, 3600 /* max staleness */, 0 /* initial retry delay */); char scratch[100]; StringPiece result; @@ -205,7 +205,7 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache_MaxStaleness) { // across file close/open boundaries. for (int i = 0; i < 10; i++) { // Create two files. Since these files have the same name name and the max - // staleness of the filesystem is > 0, they will share the same block cache. + // staleness of the filesystem is > 0, they will share the same blocks. std::unique_ptr<RandomAccessFile> file1; std::unique_ptr<RandomAccessFile> file2; TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/object", &file1)); @@ -234,7 +234,7 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_NoObjectName) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* read ahead bytes */, 0 /* block count */, + 0 /* read ahead bytes */, 0 /* max bytes */, 0 /* max staleness */, 0 /* initial retry delay */); std::unique_ptr<RandomAccessFile> file; @@ -242,104 +242,57 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_NoObjectName) { fs.NewRandomAccessFile("gs://bucket/", &file).code()); } -TEST(GcsFileSystemTest, NewWritableFile_FlushesFileCache) { - // Our underlying file in this test is a 16 byte file with initial contents - // "0123456789abcdef", replaced by new contents "08192a3b4c5d6e7f". +TEST(GcsFileSystemTest, NewWritableFile) { std::vector<HttpRequest*> requests( - {new FakeHttpRequest("Uri: https://storage.googleapis.com/bucket/object\n" - "Auth Token: fake_token\n" - "Range: 0-7\n", - "01234567"), - new FakeHttpRequest("Uri: https://storage.googleapis.com/bucket/object\n" - "Auth Token: fake_token\n" - "Range: 8-15\n", - "89abcdef"), + {new FakeHttpRequest( + "Uri: https://storage.googleapis.com/bucket/path%2Fwriteable\n" + "Auth Token: fake_token\n" + "Range: 0-7\n", + "01234567"), new FakeHttpRequest( "Uri: https://www.googleapis.com/upload/storage/v1/b/bucket/o?" - "uploadType=resumable&name=object\n" + "uploadType=resumable&name=path%2Fwriteable\n" "Auth Token: fake_token\n" - "Header X-Upload-Content-Length: 16\n" + "Header X-Upload-Content-Length: 17\n" "Post: yes\n", "", {{"Location", "https://custom/upload/location"}}), new FakeHttpRequest("Uri: https://custom/upload/location\n" "Auth Token: fake_token\n" - "Header Content-Range: bytes 0-15/16\n" - "Put body: 08192a3b4c5d6e7f\n", + "Header Content-Range: bytes 0-16/17\n" + "Put body: content1,content2\n", ""), - new FakeHttpRequest("Uri: https://storage.googleapis.com/bucket/object\n" - "Auth Token: fake_token\n" - "Range: 0-7\n", - "08192a3b"), - new FakeHttpRequest("Uri: https://storage.googleapis.com/bucket/object\n" - "Auth Token: fake_token\n" - "Range: 8-15\n", - "4c5d6e7f")}); + new FakeHttpRequest( + "Uri: https://storage.googleapis.com/bucket/path%2Fwriteable\n" + "Auth Token: fake_token\n" + "Range: 0-7\n", + "01234567")}); GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 8 /* block size */, 2 /* block count */, - 3600 /* max staleness */, 0 /* initial retry delay */); - // First, read both blocks of the file with its initial contents. This should - // trigger the first two HTTP requests to GCS, to load each of the blocks in - // order. + 8 /* block size */, 8 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); + + // Read from the file first, to fill the block cache. std::unique_ptr<RandomAccessFile> rfile; - TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/object", &rfile)); + TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/path/writeable", &rfile)); char scratch[100]; StringPiece result; - TF_EXPECT_OK(rfile->Read(0, 16, &result, scratch)); - EXPECT_EQ("0123456789abcdef", result); - // Now open a writable file and write the new file contents. This should - // trigger the next two HTTP requests to GCS. + TF_EXPECT_OK(rfile->Read(0, 4, &result, scratch)); + EXPECT_EQ("0123", result); + // Open the writable file. std::unique_ptr<WritableFile> wfile; - TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/object", &wfile)); - TF_EXPECT_OK(wfile->Append("08192a3b4c5d6e7f")); - // This rfile read should hit the block cache and not trigger any requests, - // because we haven't flushed the write yet. - TF_EXPECT_OK(rfile->Read(0, 16, &result, scratch)); - EXPECT_EQ("0123456789abcdef", result); - // Now flush the write. Any opens of gs://bucket/object after this point will - // no longer use the previous FileBlockCache for that file. + TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable", &wfile)); + TF_EXPECT_OK(wfile->Append("content1,")); + TF_EXPECT_OK(wfile->Append("content2")); TF_EXPECT_OK(wfile->Flush()); - // Reopen `rfile` and read its contents again. This will trigger the next two - // HTTP requests to GCS, which will return the (updated) file contents. Note - // that the contents returned in those HTTP requests are not really relevant; - // the main thing is that both blocks of the file are reloaded. - TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/object", &rfile)); - TF_EXPECT_OK(rfile->Read(0, 16, &result, scratch)); - EXPECT_EQ("08192a3b4c5d6e7f", result); -} - -TEST(GcsFileSystemTest, NewWritableFile) { - std::vector<HttpRequest*> requests( - {new FakeHttpRequest( - "Uri: https://www.googleapis.com/upload/storage/v1/b/bucket/o?" - "uploadType=resumable&name=path%2Fwriteable.txt\n" - "Auth Token: fake_token\n" - "Header X-Upload-Content-Length: 17\n" - "Post: yes\n", - "", {{"Location", "https://custom/upload/location"}}), - new FakeHttpRequest("Uri: https://custom/upload/location\n" - "Auth Token: fake_token\n" - "Header Content-Range: bytes 0-16/17\n" - "Put body: content1,content2\n", - "")}); - GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), - std::unique_ptr<HttpRequest::Factory>( - new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); - - std::unique_ptr<WritableFile> file; - TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file)); - - TF_EXPECT_OK(file->Append("content1,")); - TF_EXPECT_OK(file->Append("content2")); - TF_EXPECT_OK(file->Flush()); + // Re-reading the file should trigger another HTTP request to GCS. + TF_EXPECT_OK(rfile->Read(0, 4, &result, scratch)); + EXPECT_EQ("0123", result); // The calls to flush, sync, and close below should not cause uploads because // the file is not dirty. - TF_EXPECT_OK(file->Flush()); - TF_EXPECT_OK(file->Sync()); - TF_EXPECT_OK(file->Close()); + TF_EXPECT_OK(wfile->Flush()); + TF_EXPECT_OK(wfile->Sync()); + TF_EXPECT_OK(wfile->Close()); } TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadSucceeds) { @@ -381,8 +334,8 @@ TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadSucceeds) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::unique_ptr<WritableFile> file; TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file)); @@ -393,10 +346,18 @@ TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadSucceeds) { } TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadSucceedsOnGetStatus) { + // This test also verifies that a file's blocks are purged from the cache when + // the file is written, even when the write takes the "succeeds on get status" + // path. std::vector<HttpRequest*> requests( {new FakeHttpRequest( + "Uri: https://storage.googleapis.com/bucket/path%2Fwriteable\n" + "Auth Token: fake_token\n" + "Range: 0-7\n", + "01234567"), + new FakeHttpRequest( "Uri: https://www.googleapis.com/upload/storage/v1/b/bucket/o?" - "uploadType=resumable&name=path%2Fwriteable.txt\n" + "uploadType=resumable&name=path%2Fwriteable\n" "Auth Token: fake_token\n" "Header X-Upload-Content-Length: 17\n" "Post: yes\n", @@ -410,19 +371,41 @@ TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadSucceedsOnGetStatus) { "Auth Token: fake_token\n" "Header Content-Range: bytes */17\n" "Put: yes\n", - "", Status::OK(), nullptr, {}, 201)}); + "", Status::OK(), nullptr, {}, 201), + new FakeHttpRequest( + "Uri: https://storage.googleapis.com/bucket/path%2Fwriteable\n" + "Auth Token: fake_token\n" + "Range: 0-7\n", + "01234567")}); GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); - - std::unique_ptr<WritableFile> file; - TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file)); - - TF_EXPECT_OK(file->Append("content1,")); - TF_EXPECT_OK(file->Append("content2")); - TF_EXPECT_OK(file->Close()); + 8 /* block size */, 8 /* max bytes */, + 3600 /* max staleness */, 0 /* initial retry delay */); + // Pull the file's first block into the cache. This will trigger the first + // HTTP request to GCS. + std::unique_ptr<RandomAccessFile> rfile; + TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/path/writeable", &rfile)); + char scratch[100]; + StringPiece result; + TF_EXPECT_OK(rfile->Read(0, 4, &result, scratch)); + EXPECT_EQ("0123", result); + // Now write to the same file. Once the write succeeds, the cached block will + // be flushed. + std::unique_ptr<WritableFile> wfile; + TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable", &wfile)); + TF_EXPECT_OK(wfile->Append("content1,")); + TF_EXPECT_OK(wfile->Append("content2")); + // Appending doesn't invalidate the read cache - only flushing does. This read + // will not trigger an HTTP request to GCS. + TF_EXPECT_OK(rfile->Read(4, 4, &result, scratch)); + EXPECT_EQ("4567", result); + // Closing the file triggers HTTP requests to GCS and invalidates the read + // cache for the file. + TF_EXPECT_OK(wfile->Close()); + // Reading the first block of the file goes to GCS again. + TF_EXPECT_OK(rfile->Read(0, 8, &result, scratch)); + EXPECT_EQ("01234567", result); } TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadAllAttemptsFail) { @@ -472,8 +455,8 @@ TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadAllAttemptsFail) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 2 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 2 /* initial retry delay */); std::unique_ptr<WritableFile> file; TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file)); @@ -520,8 +503,8 @@ TEST(GcsFileSystemTest, NewWritableFile_UploadReturns410) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::unique_ptr<WritableFile> file; TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file)); @@ -546,8 +529,8 @@ TEST(GcsFileSystemTest, NewWritableFile_NoObjectName) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::unique_ptr<WritableFile> file; EXPECT_EQ(errors::Code::INVALID_ARGUMENT, @@ -557,13 +540,13 @@ TEST(GcsFileSystemTest, NewWritableFile_NoObjectName) { TEST(GcsFileSystemTest, NewAppendableFile) { std::vector<HttpRequest*> requests( {new FakeHttpRequest( - "Uri: https://storage.googleapis.com/bucket/path%2Fappendable.txt\n" + "Uri: https://storage.googleapis.com/bucket/path%2Fappendable\n" "Auth Token: fake_token\n" - "Range: 0-1048575\n", + "Range: 0-31\n", "content1,"), new FakeHttpRequest( "Uri: https://www.googleapis.com/upload/storage/v1/b/bucket/o?" - "uploadType=resumable&name=path%2Fappendable.txt\n" + "uploadType=resumable&name=path%2Fappendable\n" "Auth Token: fake_token\n" "Header X-Upload-Content-Length: 17\n" "Post: yes\n", @@ -572,18 +555,38 @@ TEST(GcsFileSystemTest, NewAppendableFile) { "Auth Token: fake_token\n" "Header Content-Range: bytes 0-16/17\n" "Put body: content1,content2\n", - "")}); + ""), + new FakeHttpRequest( + "Uri: https://storage.googleapis.com/bucket/path%2Fappendable\n" + "Auth Token: fake_token\n" + "Range: 0-31\n", + "01234567")}); GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, + 32 /* block size */, 32 /* max bytes */, 0 /* max staleness */, 0 /* initial retry delay */); - std::unique_ptr<WritableFile> file; - TF_EXPECT_OK(fs.NewAppendableFile("gs://bucket/path/appendable.txt", &file)); - - TF_EXPECT_OK(file->Append("content2")); - TF_EXPECT_OK(file->Close()); + // Create an appendable file. This should read the file from GCS, and pull its + // contents into the block cache. + std::unique_ptr<WritableFile> wfile; + TF_EXPECT_OK(fs.NewAppendableFile("gs://bucket/path/appendable", &wfile)); + TF_EXPECT_OK(wfile->Append("content2")); + // Verify that the file contents are in the block cache. This read should not + // trigger an HTTP request to GCS. + std::unique_ptr<RandomAccessFile> rfile; + TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/path/appendable", &rfile)); + char scratch[100]; + StringPiece result; + TF_EXPECT_OK(rfile->Read(0, 8, &result, scratch)); + EXPECT_EQ("content1", result); + // Closing the appendable file will flush its contents to GCS, triggering HTTP + // requests. + TF_EXPECT_OK(wfile->Close()); + // Redo the read. The block should be reloaded from GCS, causing one more HTTP + // request to load it. + TF_EXPECT_OK(rfile->Read(0, 4, &result, scratch)); + EXPECT_EQ("0123", result); } TEST(GcsFileSystemTest, NewAppendableFile_NoObjectName) { @@ -591,8 +594,8 @@ TEST(GcsFileSystemTest, NewAppendableFile_NoObjectName) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::unique_ptr<WritableFile> file; EXPECT_EQ(errors::Code::INVALID_ARGUMENT, @@ -618,8 +621,8 @@ TEST(GcsFileSystemTest, NewReadOnlyMemoryRegionFromFile) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::unique_ptr<ReadOnlyMemoryRegion> region; TF_EXPECT_OK(fs.NewReadOnlyMemoryRegionFromFile( @@ -634,8 +637,8 @@ TEST(GcsFileSystemTest, NewReadOnlyMemoryRegionFromFile_NoObjectName) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::unique_ptr<ReadOnlyMemoryRegion> region; EXPECT_EQ(errors::Code::INVALID_ARGUMENT, @@ -652,8 +655,8 @@ TEST(GcsFileSystemTest, FileExists_YesAsObject) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); TF_EXPECT_OK(fs.FileExists("gs://bucket/path/file1.txt")); } @@ -675,8 +678,8 @@ TEST(GcsFileSystemTest, FileExists_YesAsFolder) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); TF_EXPECT_OK(fs.FileExists("gs://bucket/path/subfolder")); } @@ -694,8 +697,8 @@ TEST(GcsFileSystemTest, FileExists_YesAsBucket) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); TF_EXPECT_OK(fs.FileExists("gs://bucket1")); TF_EXPECT_OK(fs.FileExists("gs://bucket1/")); @@ -717,8 +720,8 @@ TEST(GcsFileSystemTest, FileExists_NotAsObjectOrFolder) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); EXPECT_EQ(errors::Code::NOT_FOUND, fs.FileExists("gs://bucket/path/file1.txt").code()); @@ -737,8 +740,8 @@ TEST(GcsFileSystemTest, FileExists_NotAsBucket) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); EXPECT_EQ(errors::Code::INVALID_ARGUMENT, fs.FileExists("gs://bucket2/").code()); EXPECT_EQ(errors::Code::INVALID_ARGUMENT, @@ -755,8 +758,8 @@ TEST(GcsFileSystemTest, GetChildren_NoItems) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::vector<string> children; TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", &children)); @@ -777,8 +780,8 @@ TEST(GcsFileSystemTest, GetChildren_ThreeFiles) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::vector<string> children; TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", &children)); @@ -800,8 +803,8 @@ TEST(GcsFileSystemTest, GetChildren_SelfDirectoryMarker) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::vector<string> children; TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", &children)); @@ -822,8 +825,8 @@ TEST(GcsFileSystemTest, GetChildren_ThreeFiles_NoSlash) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::vector<string> children; TF_EXPECT_OK(fs.GetChildren("gs://bucket/path", &children)); @@ -841,8 +844,8 @@ TEST(GcsFileSystemTest, GetChildren_Root) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::vector<string> children; TF_EXPECT_OK(fs.GetChildren("gs://bucket-a-b-c", &children)); @@ -860,8 +863,8 @@ TEST(GcsFileSystemTest, GetChildren_Empty) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::vector<string> children; TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", &children)); @@ -894,8 +897,8 @@ TEST(GcsFileSystemTest, GetChildren_Pagination) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::vector<string> children; TF_EXPECT_OK(fs.GetChildren("gs://bucket/path", &children)); @@ -915,8 +918,8 @@ TEST(GcsFileSystemTest, GetMatchingPaths_NoWildcard) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::vector<string> result; TF_EXPECT_OK( @@ -937,8 +940,8 @@ TEST(GcsFileSystemTest, GetMatchingPaths_BucketAndWildcard) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::vector<string> result; TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/*/*", &result)); @@ -960,8 +963,8 @@ TEST(GcsFileSystemTest, GetMatchingPaths_FolderAndWildcard_Matches) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::vector<string> result; TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*/file2.txt", &result)); @@ -980,8 +983,8 @@ TEST(GcsFileSystemTest, GetMatchingPaths_SelfDirectoryMarker) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::vector<string> result; TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*", &result)); @@ -1000,8 +1003,8 @@ TEST(GcsFileSystemTest, GetMatchingPaths_FolderAndWildcard_NoMatches) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::vector<string> result; TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*/file3.txt", &result)); @@ -1013,8 +1016,8 @@ TEST(GcsFileSystemTest, GetMatchingPaths_OnlyWildcard) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); std::vector<string> result; EXPECT_EQ(errors::Code::INVALID_ARGUMENT, @@ -1023,18 +1026,40 @@ TEST(GcsFileSystemTest, GetMatchingPaths_OnlyWildcard) { TEST(GcsFileSystemTest, DeleteFile) { std::vector<HttpRequest*> requests( - {new FakeHttpRequest("Uri: https://www.googleapis.com/storage/v1/b" + {new FakeHttpRequest( + "Uri: https://storage.googleapis.com/bucket/path%2Ffile1.txt\n" + "Auth Token: fake_token\n" + "Range: 0-15\n", + "01234567"), + new FakeHttpRequest("Uri: https://www.googleapis.com/storage/v1/b" "/bucket/o/path%2Ffile1.txt\n" "Auth Token: fake_token\n" "Delete: yes\n", - "")}); + ""), + new FakeHttpRequest( + "Uri: https://storage.googleapis.com/bucket/path%2Ffile1.txt\n" + "Auth Token: fake_token\n" + "Range: 0-15\n", + "76543210")}); GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, + 16 /* block size */, 16 /* max bytes */, 0 /* max staleness */, 0 /* initial retry delay */); + // Do an initial read of the file to load its contents into the block cache. + char scratch[100]; + StringPiece result; + std::unique_ptr<RandomAccessFile> file; + TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/path/file1.txt", &file)); + TF_EXPECT_OK(file->Read(0, 8, &result, scratch)); + EXPECT_EQ("01234567", result); + // Deleting the file triggers the next HTTP request to GCS. TF_EXPECT_OK(fs.DeleteFile("gs://bucket/path/file1.txt")); + // Re-reading the file causes its contents to be reloaded from GCS and not + // from the block cache. + TF_EXPECT_OK(file->Read(0, 8, &result, scratch)); + EXPECT_EQ("76543210", result); } TEST(GcsFileSystemTest, DeleteFile_NoObjectName) { @@ -1042,8 +1067,8 @@ TEST(GcsFileSystemTest, DeleteFile_NoObjectName) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); EXPECT_EQ(errors::Code::INVALID_ARGUMENT, fs.DeleteFile("gs://bucket/").code()); @@ -1058,8 +1083,8 @@ TEST(GcsFileSystemTest, DeleteDir_Empty) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); TF_EXPECT_OK(fs.DeleteDir("gs://bucket/path/")); } @@ -1080,8 +1105,8 @@ TEST(GcsFileSystemTest, DeleteDir_OnlyDirMarkerLeft) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); TF_EXPECT_OK(fs.DeleteDir("gs://bucket/path/")); } @@ -1094,8 +1119,8 @@ TEST(GcsFileSystemTest, DeleteDir_BucketOnly) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); TF_EXPECT_OK(fs.DeleteDir("gs://bucket")); } @@ -1110,8 +1135,8 @@ TEST(GcsFileSystemTest, DeleteDir_NonEmpty) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); EXPECT_EQ(error::Code::FAILED_PRECONDITION, fs.DeleteDir("gs://bucket/path/").code()); @@ -1127,8 +1152,8 @@ TEST(GcsFileSystemTest, GetFileSize) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); uint64 size; TF_EXPECT_OK(fs.GetFileSize("gs://bucket/file.txt", &size)); @@ -1140,8 +1165,8 @@ TEST(GcsFileSystemTest, GetFileSize_NoObjectName) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); uint64 size; EXPECT_EQ(errors::Code::INVALID_ARGUMENT, @@ -1213,15 +1238,25 @@ TEST(GcsFileSystemTest, RenameFile_Folder) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); TF_EXPECT_OK(fs.RenameFile("gs://bucket/path1", "gs://bucket/path2/")); } TEST(GcsFileSystemTest, RenameFile_Object) { std::vector<HttpRequest*> requests( - {// IsDirectory is checking whether there are children objects. + {new FakeHttpRequest( + "Uri: https://storage.googleapis.com/bucket/path%2Fsrc.txt\n" + "Auth Token: fake_token\n" + "Range: 0-15\n", + "01234567"), + new FakeHttpRequest( + "Uri: https://storage.googleapis.com/bucket/path%2Fdst.txt\n" + "Auth Token: fake_token\n" + "Range: 0-15\n", + "76543210"), + // IsDirectory is checking whether there are children objects. new FakeHttpRequest( "Uri: https://www.googleapis.com/storage/v1/b/bucket/o?" "fields=items%2Fname%2CnextPageToken&prefix=path%2Fsrc.txt%2F" @@ -1248,15 +1283,42 @@ TEST(GcsFileSystemTest, RenameFile_Object) { "path%2Fsrc.txt\n" "Auth Token: fake_token\n" "Delete: yes\n", - "")}); + ""), + new FakeHttpRequest( + "Uri: https://storage.googleapis.com/bucket/path%2Fsrc.txt\n" + "Auth Token: fake_token\n" + "Range: 0-15\n", + "89abcdef"), + new FakeHttpRequest( + "Uri: https://storage.googleapis.com/bucket/path%2Fdst.txt\n" + "Auth Token: fake_token\n" + "Range: 0-15\n", + "fedcba98")}); GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, + 16 /* block size */, 64 /* max bytes */, 0 /* max staleness */, 0 /* initial retry delay */); - + // Do an initial read of the source and destination files to load their + // contents into the block cache. + char scratch[100]; + StringPiece result; + std::unique_ptr<RandomAccessFile> src; + std::unique_ptr<RandomAccessFile> dst; + TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/path/src.txt", &src)); + TF_EXPECT_OK(src->Read(0, 8, &result, scratch)); + EXPECT_EQ("01234567", result); + TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/path/dst.txt", &dst)); + TF_EXPECT_OK(dst->Read(0, 8, &result, scratch)); + EXPECT_EQ("76543210", result); + // Now rename src to dst. This should flush the block cache for both files. TF_EXPECT_OK( fs.RenameFile("gs://bucket/path/src.txt", "gs://bucket/path/dst.txt")); + // Re-read both files. This should reload their contents from GCS. + TF_EXPECT_OK(src->Read(0, 8, &result, scratch)); + EXPECT_EQ("89abcdef", result); + TF_EXPECT_OK(dst->Read(0, 8, &result, scratch)); + EXPECT_EQ("fedcba98", result); } /// Tests the scenario when deletion returns a failure, but actually succeeds. @@ -1300,8 +1362,8 @@ TEST(GcsFileSystemTest, RenameFile_Object_DeletionRetried) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); TF_EXPECT_OK( fs.RenameFile("gs://bucket/path/src.txt", "gs://bucket/path/dst.txt")); @@ -1334,8 +1396,8 @@ TEST(GcsFileSystemTest, RenameFile_Object_Incomplete) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); EXPECT_EQ( errors::Code::UNIMPLEMENTED, @@ -1353,8 +1415,8 @@ TEST(GcsFileSystemTest, Stat_Object) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); FileStatistics stat; TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", &stat)); @@ -1380,8 +1442,8 @@ TEST(GcsFileSystemTest, Stat_Folder) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); FileStatistics stat; TF_EXPECT_OK(fs.Stat("gs://bucket/subfolder", &stat)); @@ -1406,8 +1468,8 @@ TEST(GcsFileSystemTest, Stat_ObjectOrFolderNotFound) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); FileStatistics stat; EXPECT_EQ(error::Code::NOT_FOUND, fs.Stat("gs://bucket/path", &stat).code()); @@ -1421,8 +1483,8 @@ TEST(GcsFileSystemTest, Stat_Bucket) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); FileStatistics stat; TF_EXPECT_OK(fs.Stat("gs://bucket/", &stat)); @@ -1439,8 +1501,8 @@ TEST(GcsFileSystemTest, Stat_BucketNotFound) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); FileStatistics stat; EXPECT_EQ(error::Code::NOT_FOUND, fs.Stat("gs://bucket/", &stat).code()); @@ -1462,8 +1524,8 @@ TEST(GcsFileSystemTest, IsDirectory_NotFound) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); EXPECT_EQ(error::Code::NOT_FOUND, fs.IsDirectory("gs://bucket/file.txt").code()); @@ -1486,8 +1548,8 @@ TEST(GcsFileSystemTest, IsDirectory_NotDirectoryButObject) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); EXPECT_EQ(error::Code::FAILED_PRECONDITION, fs.IsDirectory("gs://bucket/file.txt").code()); @@ -1510,8 +1572,8 @@ TEST(GcsFileSystemTest, IsDirectory_Yes) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); TF_EXPECT_OK(fs.IsDirectory("gs://bucket/subfolder")); TF_EXPECT_OK(fs.IsDirectory("gs://bucket/subfolder/")); @@ -1530,8 +1592,8 @@ TEST(GcsFileSystemTest, IsDirectory_Bucket) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); TF_EXPECT_OK(fs.IsDirectory("gs://bucket")); TF_EXPECT_OK(fs.IsDirectory("gs://bucket/")); @@ -1545,8 +1607,8 @@ TEST(GcsFileSystemTest, IsDirectory_BucketNotFound) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); EXPECT_EQ(error::Code::NOT_FOUND, fs.IsDirectory("gs://bucket/").code()); } @@ -1578,8 +1640,8 @@ TEST(GcsFileSystemTest, CreateDir_Folder) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); TF_EXPECT_OK(fs.CreateDir("gs://bucket/subpath")); TF_EXPECT_OK(fs.CreateDir("gs://bucket/subpath/")); @@ -1598,8 +1660,8 @@ TEST(GcsFileSystemTest, CreateDir_Bucket) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); TF_EXPECT_OK(fs.CreateDir("gs://bucket/")); TF_EXPECT_OK(fs.CreateDir("gs://bucket")); @@ -1658,8 +1720,8 @@ TEST(GcsFileSystemTest, DeleteRecursively_Ok) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); int64 undeleted_files, undeleted_dirs; TF_EXPECT_OK(fs.DeleteRecursively("gs://bucket/path", &undeleted_files, @@ -1737,8 +1799,8 @@ TEST(GcsFileSystemTest, DeleteRecursively_DeletionErrors) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); int64 undeleted_files, undeleted_dirs; TF_EXPECT_OK(fs.DeleteRecursively("gs://bucket/path", &undeleted_files, @@ -1765,8 +1827,8 @@ TEST(GcsFileSystemTest, DeleteRecursively_NotAFolder) { GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), - 0 /* block size */, 0 /* block count */, - 0 /* max staleness */, 0 /* initial retry delay */); + 0 /* block size */, 0 /* max bytes */, 0 /* max staleness */, + 0 /* initial retry delay */); int64 undeleted_files, undeleted_dirs; EXPECT_EQ(error::Code::NOT_FOUND, @@ -1781,7 +1843,7 @@ TEST(GcsFileSystemTest, OverrideCacheParameters) { // Verify defaults are propagated correctly. GcsFileSystem fs1; EXPECT_EQ(256 * 1024 * 1024, fs1.block_size()); - EXPECT_EQ(1, fs1.block_count()); + EXPECT_EQ(fs1.block_size(), fs1.max_bytes()); EXPECT_EQ(0, fs1.max_staleness()); // Verify legacy readahead buffer override sets block size. @@ -1794,10 +1856,10 @@ TEST(GcsFileSystemTest, OverrideCacheParameters) { GcsFileSystem fs3; EXPECT_EQ(1048576L, fs3.block_size()); - // Verify block count override. - setenv("GCS_READ_CACHE_BLOCK_COUNT", "16", 1); + // Verify max size override. + setenv("GCS_READ_CACHE_MAX_SIZE_MB", "16", 1); GcsFileSystem fs4; - EXPECT_EQ(16, fs4.block_count()); + EXPECT_EQ(16 * 1024 * 1024, fs4.max_bytes()); // Verify max staleness override. setenv("GCS_READ_CACHE_MAX_STALENESS", "60", 1); |