diff options
author | 2017-07-13 18:32:07 -0700 | |
---|---|---|
committer | 2017-07-13 18:47:50 -0700 | |
commit | a641129c9b0b801bc14da3d87d28777926b2540c (patch) | |
tree | 00a89d05bb65baed071b45802e0b733bc960a47a | |
parent | 3f538fadf2dedaeddc295580eaff635c98f1793a (diff) |
Adds a file cache to the GCS filesystem, with configurable max staleness for file contents. Maximum staleness is specified in seconds in the GCS_READ_CACHE_MAX_STALENESS environment variable, and defaults to 0 (indicating that staleness is not tolerated in a newly opened file). Staleness is measured from the arrival time of the first block in the file's block cache.
PiperOrigin-RevId: 161897769
-rw-r--r-- | tensorflow/core/platform/cloud/BUILD | 1 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/file_block_cache.cc | 25 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/file_block_cache.h | 23 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/file_block_cache_test.cc | 47 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/gcs_file_system.cc | 169 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/gcs_file_system.h | 19 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/gcs_file_system_test.cc | 239 |
7 files changed, 383 insertions, 140 deletions
diff --git a/tensorflow/core/platform/cloud/BUILD b/tensorflow/core/platform/cloud/BUILD index 183a447012..67cd1bb2c6 100644 --- a/tensorflow/core/platform/cloud/BUILD +++ b/tensorflow/core/platform/cloud/BUILD @@ -170,6 +170,7 @@ tf_cc_test( srcs = ["file_block_cache_test.cc"], deps = [ ":file_block_cache", + "//tensorflow/core:lib", "//tensorflow/core:test", "//tensorflow/core:test_main", ], diff --git a/tensorflow/core/platform/cloud/file_block_cache.cc b/tensorflow/core/platform/cloud/file_block_cache.cc index b794614cd7..84ef0ea1fe 100644 --- a/tensorflow/core/platform/cloud/file_block_cache.cc +++ b/tensorflow/core/platform/cloud/file_block_cache.cc @@ -36,9 +36,16 @@ Status FileBlockCache::Read(uint64 offset, size_t n, std::vector<char>* out) { if (finish < offset + n) { finish += block_size_; } + mutex_lock lock(mu_); + // If the oldest block arrived max_staleness_ in the past, flush the cache. + // Note that if max_staleness_ is 0, we don't expire any cached blocks. + if (max_staleness_ > 0 && timestamp_ > 0 && + env_->NowSeconds() - timestamp_ > max_staleness_) { + TrimCache(0); + timestamp_ = 0; + } // Now iterate through the blocks, reading them one at a time. Reads are // locked so that only one block_fetcher call is active at any given time. - mutex_lock lock(mu_); for (uint64 pos = start; pos < finish; pos += block_size_) { auto entry = block_map_.find(pos); if (entry == block_map_.end()) { @@ -47,10 +54,7 @@ Status FileBlockCache::Read(uint64 offset, size_t n, std::vector<char>* out) { // time during which the cache size exceeds its desired limit. The // tradeoff is that if the fetcher fails, the cache may evict a block // prematurely. - while (lru_list_.size() >= block_count_) { - block_map_.erase(lru_list_.back()); - lru_list_.pop_back(); - } + TrimCache(block_count_ - 1); std::unique_ptr<Block> block(new Block); TF_RETURN_IF_ERROR(block_fetcher_(pos, block_size_, &block->data)); // Sanity check to detect interrupted reads leading to partial blocks: a @@ -62,6 +66,10 @@ Status FileBlockCache::Read(uint64 offset, size_t n, std::vector<char>* out) { return errors::FailedPrecondition("File contents are inconsistent"); } entry = block_map_.emplace(std::make_pair(pos, std::move(block))).first; + if (timestamp_ == 0) { + // Mark the timestamp of the first block's arrival in the cache. + timestamp_ = env_->NowSeconds(); + } } else { // Cache hit. Remove the block from the LRU list at its prior location. lru_list_.erase(entry->second->lru_iterator); @@ -94,4 +102,11 @@ Status FileBlockCache::Read(uint64 offset, size_t n, std::vector<char>* out) { return Status::OK(); } +void FileBlockCache::TrimCache(size_t size) { + while (lru_list_.size() > size) { + block_map_.erase(lru_list_.back()); + lru_list_.pop_back(); + } +} + } // namespace tensorflow diff --git a/tensorflow/core/platform/cloud/file_block_cache.h b/tensorflow/core/platform/cloud/file_block_cache.h index 442b1b0319..4acc4c7fca 100644 --- a/tensorflow/core/platform/cloud/file_block_cache.h +++ b/tensorflow/core/platform/cloud/file_block_cache.h @@ -24,6 +24,7 @@ limitations under the License. #include <vector> #include "tensorflow/core/lib/core/status.h" #include "tensorflow/core/lib/core/stringpiece.h" +#include "tensorflow/core/platform/env.h" #include "tensorflow/core/platform/mutex.h" #include "tensorflow/core/platform/thread_annotations.h" #include "tensorflow/core/platform/types.h" @@ -44,11 +45,13 @@ class FileBlockCache { typedef std::function<Status(uint64, size_t, std::vector<char>*)> BlockFetcher; - FileBlockCache(uint64 block_size, uint32 block_count, - BlockFetcher block_fetcher) + FileBlockCache(uint64 block_size, uint32 block_count, uint64 max_staleness, + BlockFetcher block_fetcher, Env* env = Env::Default()) : block_size_(block_size), block_count_(block_count), - block_fetcher_(block_fetcher) {} + max_staleness_(max_staleness), + block_fetcher_(block_fetcher), + env_(env) {} /// Read `n` bytes starting at `offset` into `out`. This method will return: /// @@ -66,13 +69,20 @@ class FileBlockCache { Status Read(uint64 offset, size_t n, std::vector<char>* out); private: + /// Trim the LRU cache until its size is at most `size` blocks. + void TrimCache(size_t size) EXCLUSIVE_LOCKS_REQUIRED(mu_); + /// The size of the blocks stored in the LRU cache, as well as the size of the /// reads from the underlying filesystem. const uint64 block_size_; /// The maximum number of blocks allowed in the LRU cache. const uint32 block_count_; + /// The maximum staleness of any block in the LRU cache, in seconds. + const uint64 max_staleness_; /// The callback to read a block from the underlying filesystem. const BlockFetcher block_fetcher_; + /// The Env from which we read timestamps. + Env* const env_; // not owned /// \brief A block of a file. /// @@ -85,7 +95,7 @@ class FileBlockCache { std::list<uint64>::iterator lru_iterator; }; - /// Guards access to the block map and LRU list. + /// Guards access to the block map, LRU list, and cache timestamp. mutex mu_; /// The block map (map from offset in the file to Block object). @@ -94,6 +104,11 @@ class FileBlockCache { /// The LRU list of offsets in the file. The front of the list is the position /// of the most recently accessed block. std::list<uint64> lru_list_ GUARDED_BY(mu_); + + /// The most recent timestamp (in seconds since epoch) at which the block map + /// transitioned from empty to non-empty. A value of 0 means the block map is + /// currently empty. + uint64 timestamp_ GUARDED_BY(mu_) = 0; }; } // namespace tensorflow diff --git a/tensorflow/core/platform/cloud/file_block_cache_test.cc b/tensorflow/core/platform/cloud/file_block_cache_test.cc index 4f62e4ee6a..8ff02ae00b 100644 --- a/tensorflow/core/platform/cloud/file_block_cache_test.cc +++ b/tensorflow/core/platform/cloud/file_block_cache_test.cc @@ -16,6 +16,7 @@ limitations under the License. #include "tensorflow/core/platform/cloud/file_block_cache.h" #include <cstring> #include "tensorflow/core/lib/core/status_test_util.h" +#include "tensorflow/core/platform/env.h" #include "tensorflow/core/platform/test.h" namespace tensorflow { @@ -34,9 +35,9 @@ TEST(FileBlockCacheTest, PassThrough) { return Status::OK(); }; // If block_size, block_count, or both are zero, the cache is a pass-through. - FileBlockCache cache1(1, 0, fetcher); - FileBlockCache cache2(0, 1, fetcher); - FileBlockCache cache3(0, 0, fetcher); + FileBlockCache cache1(1, 0, 0, fetcher); + FileBlockCache cache2(0, 1, 0, fetcher); + FileBlockCache cache3(0, 0, 0, fetcher); std::vector<char> out; TF_EXPECT_OK(cache1.Read(want_offset, want_n, &out)); EXPECT_EQ(calls, 1); @@ -68,7 +69,7 @@ TEST(FileBlockCacheTest, BlockAlignment) { for (uint64_t block_size = 2; block_size <= 4; block_size++) { // Make a cache of N-byte block size (1 block) and verify that reads of // varying offsets and lengths return correct data. - FileBlockCache cache(block_size, 1, fetcher); + FileBlockCache cache(block_size, 1, 0, fetcher); for (uint64_t offset = 0; offset < 10; offset++) { for (size_t n = block_size - 2; n <= block_size + 2; n++) { std::vector<char> got; @@ -109,7 +110,7 @@ TEST(FileBlockCacheTest, CacheHits) { return Status::OK(); }; const uint32 block_count = 256; - FileBlockCache cache(block_size, block_count, fetcher); + FileBlockCache cache(block_size, block_count, 0, fetcher); std::vector<char> out; // The cache has space for `block_count` blocks. The loop with i = 0 should // fill the cache, and the loop with i = 1 should be all cache hits. The @@ -143,7 +144,7 @@ TEST(FileBlockCacheTest, OutOfRange) { } return Status::OK(); }; - FileBlockCache cache(block_size, 1, fetcher); + FileBlockCache cache(block_size, 1, 0, fetcher); std::vector<char> out; // Reading the first 16 bytes should be fine. TF_EXPECT_OK(cache.Read(0, block_size, &out)); @@ -174,7 +175,7 @@ TEST(FileBlockCacheTest, Inconsistent) { out->resize(1, 'x'); return Status::OK(); }; - FileBlockCache cache(block_size, 2, fetcher); + FileBlockCache cache(block_size, 2, 0, fetcher); std::vector<char> out; // Read the second block; this should yield an OK status and a single byte. TF_EXPECT_OK(cache.Read(block_size, block_size, &out)); @@ -200,7 +201,7 @@ TEST(FileBlockCacheTest, LRU) { return Status::OK(); }; const uint32 block_count = 2; - FileBlockCache cache(block_size, block_count, fetcher); + FileBlockCache cache(block_size, block_count, 0, fetcher); std::vector<char> out; // Read blocks from the cache, and verify the LRU behavior based on the // fetcher calls that the cache makes. @@ -232,5 +233,35 @@ TEST(FileBlockCacheTest, LRU) { TF_EXPECT_OK(cache.Read(0, 1, &out)); } +TEST(FileBlockCacheTest, MaxStaleness) { + // This Env wrapper lets us control the NowSeconds() return value. + class FakeEnv : public EnvWrapper { + public: + FakeEnv() : EnvWrapper(Env::Default()) {} + uint64 NowSeconds() override { return now_; }; + uint64 now_ = 1; + }; + int calls = 0; + auto fetcher = [&calls](uint64 offset, size_t n, std::vector<char>* out) { + calls++; + out->resize(n, 'x'); + return Status::OK(); + }; + std::unique_ptr<FakeEnv> env(new FakeEnv); + FileBlockCache cache(8, 1, 2 /* max staleness */, fetcher, env.get()); + std::vector<char> out; + // Execute the first read to load the block. + TF_EXPECT_OK(cache.Read(0, 1, &out)); + EXPECT_EQ(calls, 1); + // Now advance the clock one second at a time and redo the read. The call + // count should advance every 3 seconds (i.e. every time the staleness is + // greater than 2). + for (int i = 1; i <= 10; i++) { + env->now_ = i + 1; + TF_EXPECT_OK(cache.Read(0, 1, &out)); + EXPECT_EQ(calls, 1 + i / 3); + } +} + } // namespace } // namespace tensorflow diff --git a/tensorflow/core/platform/cloud/gcs_file_system.cc b/tensorflow/core/platform/cloud/gcs_file_system.cc index 9f00735304..85def60a1e 100644 --- a/tensorflow/core/platform/cloud/gcs_file_system.cc +++ b/tensorflow/core/platform/cloud/gcs_file_system.cc @@ -62,6 +62,10 @@ constexpr char kBlockSize[] = "GCS_READ_CACHE_BLOCK_SIZE_MB"; // The environment variable that overrides the block count in the LRU cache of // blocks read from GCS. constexpr char kBlockCount[] = "GCS_READ_CACHE_BLOCK_COUNT"; +// The environment variable that overrides the maximum staleness of cached file +// contents. Once any block of a file reaches this staleness, all cached blocks +// will be evicted on the next read. +constexpr char kMaxStaleness[] = "GCS_READ_CACHE_MAX_STALENESS"; // The file statistics returned by Stat() for directories. const FileStatistics DIRECTORY_STAT(0, 0, true); @@ -213,29 +217,19 @@ Status GetBoolValue(const Json::Value& parent, const string& name, return Status::OK(); } -/// A GCS-based implementation of a random access file with a read-ahead buffer. +/// A GCS-based implementation of a random access file with an LRU block cache. class GcsRandomAccessFile : public RandomAccessFile { public: - GcsRandomAccessFile(const string& bucket, const string& object, - AuthProvider* auth_provider, - HttpRequest::Factory* http_request_factory, - size_t block_size, uint32 block_count) - : bucket_(bucket), - object_(object), - auth_provider_(auth_provider), - http_request_factory_(http_request_factory), - file_block_cache_( - block_size, block_count, - [this](uint64 offset, size_t n, std::vector<char>* out) { - return LoadBufferFromGCS(offset, n, out); - }) {} + explicit GcsRandomAccessFile( + const std::shared_ptr<FileBlockCache>& file_block_cache) + : file_block_cache_(file_block_cache) {} /// The implementation of reads with an LRU block cache. Thread safe. Status Read(uint64 offset, size_t n, StringPiece* result, char* scratch) const override { result->clear(); std::vector<char> out; - TF_RETURN_IF_ERROR(file_block_cache_.Read(offset, n, &out)); + TF_RETURN_IF_ERROR(file_block_cache_->Read(offset, n, &out)); std::memcpy(scratch, out.data(), std::min(out.size(), n)); *result = StringPiece(scratch, std::min(out.size(), n)); if (result->size() < n) { @@ -249,29 +243,8 @@ class GcsRandomAccessFile : public RandomAccessFile { } private: - /// A helper function to actually read the data from GCS. - Status LoadBufferFromGCS(uint64_t offset, size_t n, std::vector<char>* out) { - string auth_token; - TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_, &auth_token)); - - std::unique_ptr<HttpRequest> request(http_request_factory_->Create()); - TF_RETURN_IF_ERROR(request->Init()); - TF_RETURN_IF_ERROR( - request->SetUri(strings::StrCat("https://", kStorageHost, "/", bucket_, - "/", request->EscapeString(object_)))); - TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token)); - TF_RETURN_IF_ERROR(request->SetRange(offset, offset + n - 1)); - TF_RETURN_IF_ERROR(request->SetResultBuffer(out)); - TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading gs://", - bucket_, "/", object_); - return Status::OK(); - } - - string bucket_; - string object_; - AuthProvider* auth_provider_; - HttpRequest::Factory* http_request_factory_; - mutable FileBlockCache file_block_cache_; + /// The LRU block cache for this file. + mutable std::shared_ptr<FileBlockCache> file_block_cache_; }; /// \brief GCS-based implementation of a writeable file. @@ -283,11 +256,13 @@ class GcsWritableFile : public WritableFile { GcsWritableFile(const string& bucket, const string& object, AuthProvider* auth_provider, HttpRequest::Factory* http_request_factory, + std::function<void()> file_cache_erase, int64 initial_retry_delay_usec) : bucket_(bucket), object_(object), auth_provider_(auth_provider), http_request_factory_(http_request_factory), + file_cache_erase_(std::move(file_cache_erase)), sync_needed_(true), initial_retry_delay_usec_(initial_retry_delay_usec) { if (GetTmpFilename(&tmp_content_filename_).ok()) { @@ -305,11 +280,13 @@ class GcsWritableFile : public WritableFile { AuthProvider* auth_provider, const string& tmp_content_filename, HttpRequest::Factory* http_request_factory, + std::function<void()> file_cache_erase, int64 initial_retry_delay_usec) : bucket_(bucket), object_(object), auth_provider_(auth_provider), http_request_factory_(http_request_factory), + file_cache_erase_(std::move(file_cache_erase)), sync_needed_(true), initial_retry_delay_usec_(initial_retry_delay_usec) { tmp_content_filename_ = tmp_content_filename; @@ -377,6 +354,8 @@ class GcsWritableFile : public WritableFile { TF_RETURN_IF_ERROR(RequestUploadSessionStatus( session_uri, &completed, &already_uploaded)); if (completed) { + // Erase the file from the file cache on every successful write. + file_cache_erase_(); // It's unclear why UploadToSession didn't return OK in the // previous attempt, but GCS reports that the file is fully // uploaded, so succeed. @@ -529,6 +508,8 @@ class GcsWritableFile : public WritableFile { request->SetPutFromFile(tmp_content_filename_, start_offset)); TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when uploading ", GetGcsPath()); + // Erase the file from the file cache on every successful write. + file_cache_erase_(); return Status::OK(); } @@ -542,6 +523,7 @@ class GcsWritableFile : public WritableFile { string tmp_content_filename_; std::ofstream outfile_; HttpRequest::Factory* http_request_factory_; + std::function<void()> file_cache_erase_; bool sync_needed_; // whether there is buffered data that needs to be synced int64 initial_retry_delay_usec_; }; @@ -557,55 +539,107 @@ class GcsReadOnlyMemoryRegion : public ReadOnlyMemoryRegion { std::unique_ptr<char[]> data_; uint64 length_; }; + +// Helper function to extract an environment variable and convert it into a +// value of type T. +template <typename T> +bool GetEnvVar(const char* varname, bool (*convert)(StringPiece, T*), + T* value) { + const char* env_value = std::getenv(varname); + if (!env_value) { + return false; + } + return convert(env_value, value); +} + } // namespace GcsFileSystem::GcsFileSystem() : auth_provider_(new GoogleAuthProvider()), http_request_factory_(new HttpRequest::Factory()) { // Apply the sys env override for the readahead buffer size if it's provided. - const char* readahead_buffer_size_env = std::getenv(kReadaheadBufferSize); - if (readahead_buffer_size_env) { - uint64 value; - if (strings::safe_strtou64(readahead_buffer_size_env, &value)) { - block_size_ = value; - } + uint64 v64; + if (GetEnvVar(kReadaheadBufferSize, strings::safe_strtou64, &v64)) { + block_size_ = v64; } // Apply the override for the block size if provided. This takes precedence // over the readahead buffer size. - const char* block_size_env = std::getenv(kBlockSize); - if (block_size_env) { - uint64 value; - if (strings::safe_strtou64(block_size_env, &value)) { - block_size_ = value * 1024 * 1024; - } + if (GetEnvVar(kBlockSize, strings::safe_strtou64, &v64)) { + block_size_ = v64 * 1024 * 1024; } // Apply the override for the block count if provided. - const char* block_count_env = std::getenv(kBlockCount); - if (block_count_env) { - uint32 value; - if (strings::safe_strtou32(block_count_env, &value)) { - block_count_ = value; - } + uint32 v32; + if (GetEnvVar(kBlockCount, strings::safe_strtou32, &v32)) { + block_count_ = v32; + } + // Apply the override for max staleness if provided. + if (GetEnvVar(kMaxStaleness, strings::safe_strtou64, &v64)) { + max_staleness_ = v64; } } GcsFileSystem::GcsFileSystem( std::unique_ptr<AuthProvider> auth_provider, std::unique_ptr<HttpRequest::Factory> http_request_factory, - size_t block_size, uint32 block_count, int64 initial_retry_delay_usec) + size_t block_size, uint32 block_count, uint64 max_staleness, + int64 initial_retry_delay_usec) : auth_provider_(std::move(auth_provider)), http_request_factory_(std::move(http_request_factory)), block_size_(block_size), block_count_(block_count), + max_staleness_(max_staleness), initial_retry_delay_usec_(initial_retry_delay_usec) {} Status GcsFileSystem::NewRandomAccessFile( const string& fname, std::unique_ptr<RandomAccessFile>* result) { string bucket, object; TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object)); - result->reset(new GcsRandomAccessFile(bucket, object, auth_provider_.get(), - http_request_factory_.get(), - block_size_, block_count_)); + // `file_cache_` is a container of FileBlockCache, keyed by filename. We look + // up the filename in this container to see if we have a FileBlockCache for + // it. If the FileBlockCache for `fname` exists in file_cache_, we return it. + // Otherwise, we create a new FileBlockCache with a block fetcher that calls + // GcsFileSystem::LoadBufferFromGCS for the bucket and object derived from + // `fname`. If a FileBlockCache is created, it is added to `file_cache_` only + // if `max_staleness_` > 0 (indicating that new random acesss files will + // tolerate stale reads coming from FileBlockCache instances that persist + // across file close/open boundaries). + std::shared_ptr<FileBlockCache> file_block_cache; + mutex_lock lock(mu_); + auto entry = file_cache_.find(fname); + if (entry == file_cache_.end()) { + file_block_cache.reset(new FileBlockCache( + block_size_, block_count_, max_staleness_, + [this, bucket, object](uint64 offset, size_t n, + std::vector<char>* out) { + return LoadBufferFromGCS(bucket, object, offset, n, out); + })); + if (max_staleness_ > 0) { + file_cache_[fname] = file_block_cache; + } + } else { + file_block_cache = entry->second; + } + result->reset(new GcsRandomAccessFile(file_block_cache)); + return Status::OK(); +} + +// A helper function to actually read the data from GCS. +Status GcsFileSystem::LoadBufferFromGCS(const string& bucket, + const string& object, uint64_t offset, + size_t n, std::vector<char>* out) { + string auth_token; + TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_.get(), &auth_token)); + + std::unique_ptr<HttpRequest> request(http_request_factory_->Create()); + TF_RETURN_IF_ERROR(request->Init()); + TF_RETURN_IF_ERROR( + request->SetUri(strings::StrCat("https://", kStorageHost, "/", bucket, + "/", request->EscapeString(object)))); + TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token)); + TF_RETURN_IF_ERROR(request->SetRange(offset, offset + n - 1)); + TF_RETURN_IF_ERROR(request->SetResultBuffer(out)); + TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading gs://", + bucket, "/", object); return Status::OK(); } @@ -615,6 +649,10 @@ Status GcsFileSystem::NewWritableFile(const string& fname, TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object)); result->reset(new GcsWritableFile(bucket, object, auth_provider_.get(), http_request_factory_.get(), + [this, fname]() { + mutex_lock lock(mu_); + file_cache_.erase(fname); + }, initial_retry_delay_usec_)); return Status::OK(); } @@ -653,9 +691,14 @@ Status GcsFileSystem::NewAppendableFile(const string& fname, // Create a writable file and pass the old content to it. string bucket, object; TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object)); - result->reset(new GcsWritableFile( - bucket, object, auth_provider_.get(), old_content_filename, - http_request_factory_.get(), initial_retry_delay_usec_)); + result->reset(new GcsWritableFile(bucket, object, auth_provider_.get(), + old_content_filename, + http_request_factory_.get(), + [this, fname]() { + mutex_lock lock(mu_); + file_cache_.erase(fname); + }, + initial_retry_delay_usec_)); return Status::OK(); } diff --git a/tensorflow/core/platform/cloud/gcs_file_system.h b/tensorflow/core/platform/cloud/gcs_file_system.h index ef8899892e..6030b16aa0 100644 --- a/tensorflow/core/platform/cloud/gcs_file_system.h +++ b/tensorflow/core/platform/cloud/gcs_file_system.h @@ -20,6 +20,7 @@ limitations under the License. #include <vector> #include "tensorflow/core/lib/core/status.h" #include "tensorflow/core/platform/cloud/auth_provider.h" +#include "tensorflow/core/platform/cloud/file_block_cache.h" #include "tensorflow/core/platform/cloud/http_request.h" #include "tensorflow/core/platform/cloud/retrying_file_system.h" #include "tensorflow/core/platform/file_system.h" @@ -35,7 +36,7 @@ class GcsFileSystem : public FileSystem { GcsFileSystem(); GcsFileSystem(std::unique_ptr<AuthProvider> auth_provider, std::unique_ptr<HttpRequest::Factory> http_request_factory, - size_t block_size, uint32 block_count, + size_t block_size, uint32 block_count, uint64 max_staleness, int64 initial_retry_delay_usec); Status NewRandomAccessFile( @@ -77,6 +78,7 @@ class GcsFileSystem : public FileSystem { int64* undeleted_dirs) override; size_t block_size() const { return block_size_; } uint32 block_count() const { return block_count_; } + uint64 max_staleness() const { return max_staleness_; } private: /// \brief Checks if the bucket exists. Returns OK if the check succeeded. @@ -110,9 +112,19 @@ class GcsFileSystem : public FileSystem { FileStatistics* stat); Status RenameObject(const string& src, const string& target); + /// Loads file contents from GCS for a given bucket and object. + Status LoadBufferFromGCS(const string& bucket, const string& object, + uint64_t offset, size_t n, std::vector<char>* out); + std::unique_ptr<AuthProvider> auth_provider_; std::unique_ptr<HttpRequest::Factory> http_request_factory_; + /// Guards access to the file cache. + mutex mu_; + + /// Cache from filename to FileBlockCache. Populated iff max_staleness_ > 0. + std::map<string, std::shared_ptr<FileBlockCache>> file_cache_ GUARDED_BY(mu_); + /// The block size for block-aligned reads in the RandomAccessFile /// implementation. Defaults to 256Mb. size_t block_size_ = 256 * 1024 * 1024; @@ -121,6 +133,11 @@ class GcsFileSystem : public FileSystem { /// implementation. Defaults to 1. uint32 block_count_ = 1; + /// The maximum staleness of cached file blocks across close/open boundaries. + /// Defaults to 0, meaning that file blocks do not persist across close/open + /// boundaries. + uint64 max_staleness_ = 0; + /// The initial delay for exponential backoffs when retrying failed calls. const int64 initial_retry_delay_usec_ = 1000000L; diff --git a/tensorflow/core/platform/cloud/gcs_file_system_test.cc b/tensorflow/core/platform/cloud/gcs_file_system_test.cc index defedfdb93..915a44ad7f 100644 --- a/tensorflow/core/platform/cloud/gcs_file_system_test.cc +++ b/tensorflow/core/platform/cloud/gcs_file_system_test.cc @@ -46,7 +46,7 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_NoBlockCache) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::unique_ptr<RandomAccessFile> file; TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file)); @@ -81,7 +81,7 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_NoBlockCache_differentN) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::unique_ptr<RandomAccessFile> file; TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file)); @@ -131,7 +131,7 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 9 /* block size */, 2 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); char scratch[100]; StringPiece result; @@ -181,19 +181,134 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache) { EXPECT_EQ("0123", result); } +TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache_MaxStaleness) { + // Our underlying file in this test is a 16 byte file with contents + // "0123456789abcdef". + std::vector<HttpRequest*> requests( + {new FakeHttpRequest("Uri: https://storage.googleapis.com/bucket/object\n" + "Auth Token: fake_token\n" + "Range: 0-7\n", + "01234567"), + new FakeHttpRequest("Uri: https://storage.googleapis.com/bucket/object\n" + "Auth Token: fake_token\n" + "Range: 8-15\n", + "89abcdef")}); + GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), + std::unique_ptr<HttpRequest::Factory>( + new FakeHttpRequestFactory(&requests)), + 8 /* block size */, 2 /* block count */, + 3600 /* max staleness */, 0 /* initial retry delay */); + char scratch[100]; + StringPiece result; + // There should only be two HTTP requests issued to GCS even though we iterate + // this loop 10 times. This shows that the underlying FileBlockCache persists + // across file close/open boundaries. + for (int i = 0; i < 10; i++) { + // Create two files. Since these files have the same name name and the max + // staleness of the filesystem is > 0, they will share the same block cache. + std::unique_ptr<RandomAccessFile> file1; + std::unique_ptr<RandomAccessFile> file2; + TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/object", &file1)); + TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/object", &file2)); + // Reading the first block from file1 should load it once. + TF_EXPECT_OK(file1->Read(0, 8, &result, scratch)); + EXPECT_EQ("01234567", result); + // Reading the first block from file2 should not trigger a request to load + // the first block again, because the FileBlockCache shared by file1 and + // file2 already has the first block. + TF_EXPECT_OK(file2->Read(0, 8, &result, scratch)); + EXPECT_EQ("01234567", result); + // Reading the second block from file2 should load it once. + TF_EXPECT_OK(file2->Read(8, 8, &result, scratch)); + EXPECT_EQ("89abcdef", result); + // Reading the second block from file1 should not trigger a request to load + // the second block again, because the FileBlockCache shared by file1 and + // file2 already has the second block. + TF_EXPECT_OK(file1->Read(8, 8, &result, scratch)); + EXPECT_EQ("89abcdef", result); + } +} + TEST(GcsFileSystemTest, NewRandomAccessFile_NoObjectName) { std::vector<HttpRequest*> requests; GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* read ahead bytes */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::unique_ptr<RandomAccessFile> file; EXPECT_EQ(errors::Code::INVALID_ARGUMENT, fs.NewRandomAccessFile("gs://bucket/", &file).code()); } +TEST(GcsFileSystemTest, NewWritableFile_FlushesFileCache) { + // Our underlying file in this test is a 16 byte file with initial contents + // "0123456789abcdef", replaced by new contents "08192a3b4c5d6e7f". + std::vector<HttpRequest*> requests( + {new FakeHttpRequest("Uri: https://storage.googleapis.com/bucket/object\n" + "Auth Token: fake_token\n" + "Range: 0-7\n", + "01234567"), + new FakeHttpRequest("Uri: https://storage.googleapis.com/bucket/object\n" + "Auth Token: fake_token\n" + "Range: 8-15\n", + "89abcdef"), + new FakeHttpRequest( + "Uri: https://www.googleapis.com/upload/storage/v1/b/bucket/o?" + "uploadType=resumable&name=object\n" + "Auth Token: fake_token\n" + "Header X-Upload-Content-Length: 16\n" + "Post: yes\n", + "", {{"Location", "https://custom/upload/location"}}), + new FakeHttpRequest("Uri: https://custom/upload/location\n" + "Auth Token: fake_token\n" + "Header Content-Range: bytes 0-15/16\n" + "Put body: 08192a3b4c5d6e7f\n", + ""), + new FakeHttpRequest("Uri: https://storage.googleapis.com/bucket/object\n" + "Auth Token: fake_token\n" + "Range: 0-7\n", + "08192a3b"), + new FakeHttpRequest("Uri: https://storage.googleapis.com/bucket/object\n" + "Auth Token: fake_token\n" + "Range: 8-15\n", + "4c5d6e7f")}); + GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), + std::unique_ptr<HttpRequest::Factory>( + new FakeHttpRequestFactory(&requests)), + 8 /* block size */, 2 /* block count */, + 3600 /* max staleness */, 0 /* initial retry delay */); + // First, read both blocks of the file with its initial contents. This should + // trigger the first two HTTP requests to GCS, to load each of the blocks in + // order. + std::unique_ptr<RandomAccessFile> rfile; + TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/object", &rfile)); + char scratch[100]; + StringPiece result; + TF_EXPECT_OK(rfile->Read(0, 16, &result, scratch)); + EXPECT_EQ("0123456789abcdef", result); + // Now open a writable file and write the new file contents. This should + // trigger the next two HTTP requests to GCS. + std::unique_ptr<WritableFile> wfile; + TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/object", &wfile)); + TF_EXPECT_OK(wfile->Append("08192a3b4c5d6e7f")); + // This rfile read should hit the block cache and not trigger any requests, + // because we haven't flushed the write yet. + TF_EXPECT_OK(rfile->Read(0, 16, &result, scratch)); + EXPECT_EQ("0123456789abcdef", result); + // Now flush the write. Any opens of gs://bucket/object after this point will + // no longer use the previous FileBlockCache for that file. + TF_EXPECT_OK(wfile->Flush()); + // Reopen `rfile` and read its contents again. This will trigger the next two + // HTTP requests to GCS, which will return the (updated) file contents. Note + // that the contents returned in those HTTP requests are not really relevant; + // the main thing is that both blocks of the file are reloaded. + TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/object", &rfile)); + TF_EXPECT_OK(rfile->Read(0, 16, &result, scratch)); + EXPECT_EQ("08192a3b4c5d6e7f", result); +} + TEST(GcsFileSystemTest, NewWritableFile) { std::vector<HttpRequest*> requests( {new FakeHttpRequest( @@ -212,7 +327,7 @@ TEST(GcsFileSystemTest, NewWritableFile) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::unique_ptr<WritableFile> file; TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file)); @@ -267,7 +382,7 @@ TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadSucceeds) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::unique_ptr<WritableFile> file; TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file)); @@ -300,7 +415,7 @@ TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadSucceedsOnGetStatus) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::unique_ptr<WritableFile> file; TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file)); @@ -358,7 +473,7 @@ TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadAllAttemptsFail) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 2 /* initial retry delay */); + 0 /* max staleness */, 2 /* initial retry delay */); std::unique_ptr<WritableFile> file; TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file)); @@ -406,7 +521,7 @@ TEST(GcsFileSystemTest, NewWritableFile_UploadReturns410) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::unique_ptr<WritableFile> file; TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file)); @@ -432,7 +547,7 @@ TEST(GcsFileSystemTest, NewWritableFile_NoObjectName) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::unique_ptr<WritableFile> file; EXPECT_EQ(errors::Code::INVALID_ARGUMENT, @@ -462,7 +577,7 @@ TEST(GcsFileSystemTest, NewAppendableFile) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::unique_ptr<WritableFile> file; TF_EXPECT_OK(fs.NewAppendableFile("gs://bucket/path/appendable.txt", &file)); @@ -477,7 +592,7 @@ TEST(GcsFileSystemTest, NewAppendableFile_NoObjectName) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::unique_ptr<WritableFile> file; EXPECT_EQ(errors::Code::INVALID_ARGUMENT, @@ -504,7 +619,7 @@ TEST(GcsFileSystemTest, NewReadOnlyMemoryRegionFromFile) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::unique_ptr<ReadOnlyMemoryRegion> region; TF_EXPECT_OK(fs.NewReadOnlyMemoryRegionFromFile( @@ -520,7 +635,7 @@ TEST(GcsFileSystemTest, NewReadOnlyMemoryRegionFromFile_NoObjectName) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::unique_ptr<ReadOnlyMemoryRegion> region; EXPECT_EQ(errors::Code::INVALID_ARGUMENT, @@ -538,7 +653,7 @@ TEST(GcsFileSystemTest, FileExists_YesAsObject) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); TF_EXPECT_OK(fs.FileExists("gs://bucket/path/file1.txt")); } @@ -561,7 +676,7 @@ TEST(GcsFileSystemTest, FileExists_YesAsFolder) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); TF_EXPECT_OK(fs.FileExists("gs://bucket/path/subfolder")); } @@ -580,7 +695,7 @@ TEST(GcsFileSystemTest, FileExists_YesAsBucket) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); TF_EXPECT_OK(fs.FileExists("gs://bucket1")); TF_EXPECT_OK(fs.FileExists("gs://bucket1/")); @@ -603,7 +718,7 @@ TEST(GcsFileSystemTest, FileExists_NotAsObjectOrFolder) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); EXPECT_EQ(errors::Code::NOT_FOUND, fs.FileExists("gs://bucket/path/file1.txt").code()); @@ -623,7 +738,7 @@ TEST(GcsFileSystemTest, FileExists_NotAsBucket) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); EXPECT_EQ(errors::Code::INVALID_ARGUMENT, fs.FileExists("gs://bucket2/").code()); EXPECT_EQ(errors::Code::INVALID_ARGUMENT, @@ -641,7 +756,7 @@ TEST(GcsFileSystemTest, GetChildren_NoItems) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::vector<string> children; TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", &children)); @@ -663,7 +778,7 @@ TEST(GcsFileSystemTest, GetChildren_ThreeFiles) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::vector<string> children; TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", &children)); @@ -686,7 +801,7 @@ TEST(GcsFileSystemTest, GetChildren_SelfDirectoryMarker) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::vector<string> children; TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", &children)); @@ -708,7 +823,7 @@ TEST(GcsFileSystemTest, GetChildren_ThreeFiles_NoSlash) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::vector<string> children; TF_EXPECT_OK(fs.GetChildren("gs://bucket/path", &children)); @@ -727,7 +842,7 @@ TEST(GcsFileSystemTest, GetChildren_Root) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::vector<string> children; TF_EXPECT_OK(fs.GetChildren("gs://bucket-a-b-c", &children)); @@ -746,7 +861,7 @@ TEST(GcsFileSystemTest, GetChildren_Empty) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::vector<string> children; TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", &children)); @@ -780,7 +895,7 @@ TEST(GcsFileSystemTest, GetChildren_Pagination) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::vector<string> children; TF_EXPECT_OK(fs.GetChildren("gs://bucket/path", &children)); @@ -801,7 +916,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_NoWildcard) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::vector<string> result; TF_EXPECT_OK( @@ -823,7 +938,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_BucketAndWildcard) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::vector<string> result; TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/*/*", &result)); @@ -846,7 +961,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_FolderAndWildcard_Matches) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::vector<string> result; TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*/file2.txt", &result)); @@ -866,7 +981,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_SelfDirectoryMarker) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::vector<string> result; TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*", &result)); @@ -886,7 +1001,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_FolderAndWildcard_NoMatches) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::vector<string> result; TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*/file3.txt", &result)); @@ -899,7 +1014,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_OnlyWildcard) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); std::vector<string> result; EXPECT_EQ(errors::Code::INVALID_ARGUMENT, @@ -917,7 +1032,7 @@ TEST(GcsFileSystemTest, DeleteFile) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); TF_EXPECT_OK(fs.DeleteFile("gs://bucket/path/file1.txt")); } @@ -928,7 +1043,7 @@ TEST(GcsFileSystemTest, DeleteFile_NoObjectName) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); EXPECT_EQ(errors::Code::INVALID_ARGUMENT, fs.DeleteFile("gs://bucket/").code()); @@ -944,7 +1059,7 @@ TEST(GcsFileSystemTest, DeleteDir_Empty) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); TF_EXPECT_OK(fs.DeleteDir("gs://bucket/path/")); } @@ -966,7 +1081,7 @@ TEST(GcsFileSystemTest, DeleteDir_OnlyDirMarkerLeft) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); TF_EXPECT_OK(fs.DeleteDir("gs://bucket/path/")); } @@ -980,7 +1095,7 @@ TEST(GcsFileSystemTest, DeleteDir_BucketOnly) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); TF_EXPECT_OK(fs.DeleteDir("gs://bucket")); } @@ -996,7 +1111,7 @@ TEST(GcsFileSystemTest, DeleteDir_NonEmpty) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); EXPECT_EQ(error::Code::FAILED_PRECONDITION, fs.DeleteDir("gs://bucket/path/").code()); @@ -1013,7 +1128,7 @@ TEST(GcsFileSystemTest, GetFileSize) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); uint64 size; TF_EXPECT_OK(fs.GetFileSize("gs://bucket/file.txt", &size)); @@ -1026,7 +1141,7 @@ TEST(GcsFileSystemTest, GetFileSize_NoObjectName) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); uint64 size; EXPECT_EQ(errors::Code::INVALID_ARGUMENT, @@ -1099,7 +1214,7 @@ TEST(GcsFileSystemTest, RenameFile_Folder) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); TF_EXPECT_OK(fs.RenameFile("gs://bucket/path1", "gs://bucket/path2/")); } @@ -1138,7 +1253,7 @@ TEST(GcsFileSystemTest, RenameFile_Object) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); TF_EXPECT_OK( fs.RenameFile("gs://bucket/path/src.txt", "gs://bucket/path/dst.txt")); @@ -1186,7 +1301,7 @@ TEST(GcsFileSystemTest, RenameFile_Object_DeletionRetried) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); TF_EXPECT_OK( fs.RenameFile("gs://bucket/path/src.txt", "gs://bucket/path/dst.txt")); @@ -1220,7 +1335,7 @@ TEST(GcsFileSystemTest, RenameFile_Object_Incomplete) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); EXPECT_EQ( errors::Code::UNIMPLEMENTED, @@ -1239,7 +1354,7 @@ TEST(GcsFileSystemTest, Stat_Object) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); FileStatistics stat; TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", &stat)); @@ -1266,7 +1381,7 @@ TEST(GcsFileSystemTest, Stat_Folder) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); FileStatistics stat; TF_EXPECT_OK(fs.Stat("gs://bucket/subfolder", &stat)); @@ -1292,7 +1407,7 @@ TEST(GcsFileSystemTest, Stat_ObjectOrFolderNotFound) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); FileStatistics stat; EXPECT_EQ(error::Code::NOT_FOUND, fs.Stat("gs://bucket/path", &stat).code()); @@ -1307,7 +1422,7 @@ TEST(GcsFileSystemTest, Stat_Bucket) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); FileStatistics stat; TF_EXPECT_OK(fs.Stat("gs://bucket/", &stat)); @@ -1325,7 +1440,7 @@ TEST(GcsFileSystemTest, Stat_BucketNotFound) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); FileStatistics stat; EXPECT_EQ(error::Code::NOT_FOUND, fs.Stat("gs://bucket/", &stat).code()); @@ -1348,7 +1463,7 @@ TEST(GcsFileSystemTest, IsDirectory_NotFound) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); EXPECT_EQ(error::Code::NOT_FOUND, fs.IsDirectory("gs://bucket/file.txt").code()); @@ -1372,7 +1487,7 @@ TEST(GcsFileSystemTest, IsDirectory_NotDirectoryButObject) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); EXPECT_EQ(error::Code::FAILED_PRECONDITION, fs.IsDirectory("gs://bucket/file.txt").code()); @@ -1396,7 +1511,7 @@ TEST(GcsFileSystemTest, IsDirectory_Yes) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); TF_EXPECT_OK(fs.IsDirectory("gs://bucket/subfolder")); TF_EXPECT_OK(fs.IsDirectory("gs://bucket/subfolder/")); @@ -1416,7 +1531,7 @@ TEST(GcsFileSystemTest, IsDirectory_Bucket) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); TF_EXPECT_OK(fs.IsDirectory("gs://bucket")); TF_EXPECT_OK(fs.IsDirectory("gs://bucket/")); @@ -1431,7 +1546,7 @@ TEST(GcsFileSystemTest, IsDirectory_BucketNotFound) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); EXPECT_EQ(error::Code::NOT_FOUND, fs.IsDirectory("gs://bucket/").code()); } @@ -1464,7 +1579,7 @@ TEST(GcsFileSystemTest, CreateDir_Folder) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); TF_EXPECT_OK(fs.CreateDir("gs://bucket/subpath")); TF_EXPECT_OK(fs.CreateDir("gs://bucket/subpath/")); @@ -1484,7 +1599,7 @@ TEST(GcsFileSystemTest, CreateDir_Bucket) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); TF_EXPECT_OK(fs.CreateDir("gs://bucket/")); TF_EXPECT_OK(fs.CreateDir("gs://bucket")); @@ -1544,7 +1659,7 @@ TEST(GcsFileSystemTest, DeleteRecursively_Ok) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); int64 undeleted_files, undeleted_dirs; TF_EXPECT_OK(fs.DeleteRecursively("gs://bucket/path", &undeleted_files, @@ -1623,7 +1738,7 @@ TEST(GcsFileSystemTest, DeleteRecursively_DeletionErrors) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); int64 undeleted_files, undeleted_dirs; TF_EXPECT_OK(fs.DeleteRecursively("gs://bucket/path", &undeleted_files, @@ -1651,7 +1766,7 @@ TEST(GcsFileSystemTest, DeleteRecursively_NotAFolder) { std::unique_ptr<HttpRequest::Factory>( new FakeHttpRequestFactory(&requests)), 0 /* block size */, 0 /* block count */, - 0 /* initial retry delay */); + 0 /* max staleness */, 0 /* initial retry delay */); int64 undeleted_files, undeleted_dirs; EXPECT_EQ(error::Code::NOT_FOUND, @@ -1667,6 +1782,7 @@ TEST(GcsFileSystemTest, OverrideCacheParameters) { GcsFileSystem fs1; EXPECT_EQ(256 * 1024 * 1024, fs1.block_size()); EXPECT_EQ(1, fs1.block_count()); + EXPECT_EQ(0, fs1.max_staleness()); // Verify legacy readahead buffer override sets block size. setenv("GCS_READAHEAD_BUFFER_SIZE_BYTES", "123456789", 1); @@ -1682,6 +1798,11 @@ TEST(GcsFileSystemTest, OverrideCacheParameters) { setenv("GCS_READ_CACHE_BLOCK_COUNT", "16", 1); GcsFileSystem fs4; EXPECT_EQ(16, fs4.block_count()); + + // Verify max staleness override. + setenv("GCS_READ_CACHE_MAX_STALENESS", "60", 1); + GcsFileSystem fs5; + EXPECT_EQ(60, fs5.max_staleness()); } } // namespace |