aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar A. Unique TensorFlower <gardener@tensorflow.org>2017-07-13 18:32:07 -0700
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2017-07-13 18:47:50 -0700
commita641129c9b0b801bc14da3d87d28777926b2540c (patch)
tree00a89d05bb65baed071b45802e0b733bc960a47a
parent3f538fadf2dedaeddc295580eaff635c98f1793a (diff)
Adds a file cache to the GCS filesystem, with configurable max staleness for file contents. Maximum staleness is specified in seconds in the GCS_READ_CACHE_MAX_STALENESS environment variable, and defaults to 0 (indicating that staleness is not tolerated in a newly opened file). Staleness is measured from the arrival time of the first block in the file's block cache.
PiperOrigin-RevId: 161897769
-rw-r--r--tensorflow/core/platform/cloud/BUILD1
-rw-r--r--tensorflow/core/platform/cloud/file_block_cache.cc25
-rw-r--r--tensorflow/core/platform/cloud/file_block_cache.h23
-rw-r--r--tensorflow/core/platform/cloud/file_block_cache_test.cc47
-rw-r--r--tensorflow/core/platform/cloud/gcs_file_system.cc169
-rw-r--r--tensorflow/core/platform/cloud/gcs_file_system.h19
-rw-r--r--tensorflow/core/platform/cloud/gcs_file_system_test.cc239
7 files changed, 383 insertions, 140 deletions
diff --git a/tensorflow/core/platform/cloud/BUILD b/tensorflow/core/platform/cloud/BUILD
index 183a447012..67cd1bb2c6 100644
--- a/tensorflow/core/platform/cloud/BUILD
+++ b/tensorflow/core/platform/cloud/BUILD
@@ -170,6 +170,7 @@ tf_cc_test(
srcs = ["file_block_cache_test.cc"],
deps = [
":file_block_cache",
+ "//tensorflow/core:lib",
"//tensorflow/core:test",
"//tensorflow/core:test_main",
],
diff --git a/tensorflow/core/platform/cloud/file_block_cache.cc b/tensorflow/core/platform/cloud/file_block_cache.cc
index b794614cd7..84ef0ea1fe 100644
--- a/tensorflow/core/platform/cloud/file_block_cache.cc
+++ b/tensorflow/core/platform/cloud/file_block_cache.cc
@@ -36,9 +36,16 @@ Status FileBlockCache::Read(uint64 offset, size_t n, std::vector<char>* out) {
if (finish < offset + n) {
finish += block_size_;
}
+ mutex_lock lock(mu_);
+ // If the oldest block arrived max_staleness_ in the past, flush the cache.
+ // Note that if max_staleness_ is 0, we don't expire any cached blocks.
+ if (max_staleness_ > 0 && timestamp_ > 0 &&
+ env_->NowSeconds() - timestamp_ > max_staleness_) {
+ TrimCache(0);
+ timestamp_ = 0;
+ }
// Now iterate through the blocks, reading them one at a time. Reads are
// locked so that only one block_fetcher call is active at any given time.
- mutex_lock lock(mu_);
for (uint64 pos = start; pos < finish; pos += block_size_) {
auto entry = block_map_.find(pos);
if (entry == block_map_.end()) {
@@ -47,10 +54,7 @@ Status FileBlockCache::Read(uint64 offset, size_t n, std::vector<char>* out) {
// time during which the cache size exceeds its desired limit. The
// tradeoff is that if the fetcher fails, the cache may evict a block
// prematurely.
- while (lru_list_.size() >= block_count_) {
- block_map_.erase(lru_list_.back());
- lru_list_.pop_back();
- }
+ TrimCache(block_count_ - 1);
std::unique_ptr<Block> block(new Block);
TF_RETURN_IF_ERROR(block_fetcher_(pos, block_size_, &block->data));
// Sanity check to detect interrupted reads leading to partial blocks: a
@@ -62,6 +66,10 @@ Status FileBlockCache::Read(uint64 offset, size_t n, std::vector<char>* out) {
return errors::FailedPrecondition("File contents are inconsistent");
}
entry = block_map_.emplace(std::make_pair(pos, std::move(block))).first;
+ if (timestamp_ == 0) {
+ // Mark the timestamp of the first block's arrival in the cache.
+ timestamp_ = env_->NowSeconds();
+ }
} else {
// Cache hit. Remove the block from the LRU list at its prior location.
lru_list_.erase(entry->second->lru_iterator);
@@ -94,4 +102,11 @@ Status FileBlockCache::Read(uint64 offset, size_t n, std::vector<char>* out) {
return Status::OK();
}
+void FileBlockCache::TrimCache(size_t size) {
+ while (lru_list_.size() > size) {
+ block_map_.erase(lru_list_.back());
+ lru_list_.pop_back();
+ }
+}
+
} // namespace tensorflow
diff --git a/tensorflow/core/platform/cloud/file_block_cache.h b/tensorflow/core/platform/cloud/file_block_cache.h
index 442b1b0319..4acc4c7fca 100644
--- a/tensorflow/core/platform/cloud/file_block_cache.h
+++ b/tensorflow/core/platform/cloud/file_block_cache.h
@@ -24,6 +24,7 @@ limitations under the License.
#include <vector>
#include "tensorflow/core/lib/core/status.h"
#include "tensorflow/core/lib/core/stringpiece.h"
+#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/mutex.h"
#include "tensorflow/core/platform/thread_annotations.h"
#include "tensorflow/core/platform/types.h"
@@ -44,11 +45,13 @@ class FileBlockCache {
typedef std::function<Status(uint64, size_t, std::vector<char>*)>
BlockFetcher;
- FileBlockCache(uint64 block_size, uint32 block_count,
- BlockFetcher block_fetcher)
+ FileBlockCache(uint64 block_size, uint32 block_count, uint64 max_staleness,
+ BlockFetcher block_fetcher, Env* env = Env::Default())
: block_size_(block_size),
block_count_(block_count),
- block_fetcher_(block_fetcher) {}
+ max_staleness_(max_staleness),
+ block_fetcher_(block_fetcher),
+ env_(env) {}
/// Read `n` bytes starting at `offset` into `out`. This method will return:
///
@@ -66,13 +69,20 @@ class FileBlockCache {
Status Read(uint64 offset, size_t n, std::vector<char>* out);
private:
+ /// Trim the LRU cache until its size is at most `size` blocks.
+ void TrimCache(size_t size) EXCLUSIVE_LOCKS_REQUIRED(mu_);
+
/// The size of the blocks stored in the LRU cache, as well as the size of the
/// reads from the underlying filesystem.
const uint64 block_size_;
/// The maximum number of blocks allowed in the LRU cache.
const uint32 block_count_;
+ /// The maximum staleness of any block in the LRU cache, in seconds.
+ const uint64 max_staleness_;
/// The callback to read a block from the underlying filesystem.
const BlockFetcher block_fetcher_;
+ /// The Env from which we read timestamps.
+ Env* const env_; // not owned
/// \brief A block of a file.
///
@@ -85,7 +95,7 @@ class FileBlockCache {
std::list<uint64>::iterator lru_iterator;
};
- /// Guards access to the block map and LRU list.
+ /// Guards access to the block map, LRU list, and cache timestamp.
mutex mu_;
/// The block map (map from offset in the file to Block object).
@@ -94,6 +104,11 @@ class FileBlockCache {
/// The LRU list of offsets in the file. The front of the list is the position
/// of the most recently accessed block.
std::list<uint64> lru_list_ GUARDED_BY(mu_);
+
+ /// The most recent timestamp (in seconds since epoch) at which the block map
+ /// transitioned from empty to non-empty. A value of 0 means the block map is
+ /// currently empty.
+ uint64 timestamp_ GUARDED_BY(mu_) = 0;
};
} // namespace tensorflow
diff --git a/tensorflow/core/platform/cloud/file_block_cache_test.cc b/tensorflow/core/platform/cloud/file_block_cache_test.cc
index 4f62e4ee6a..8ff02ae00b 100644
--- a/tensorflow/core/platform/cloud/file_block_cache_test.cc
+++ b/tensorflow/core/platform/cloud/file_block_cache_test.cc
@@ -16,6 +16,7 @@ limitations under the License.
#include "tensorflow/core/platform/cloud/file_block_cache.h"
#include <cstring>
#include "tensorflow/core/lib/core/status_test_util.h"
+#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/test.h"
namespace tensorflow {
@@ -34,9 +35,9 @@ TEST(FileBlockCacheTest, PassThrough) {
return Status::OK();
};
// If block_size, block_count, or both are zero, the cache is a pass-through.
- FileBlockCache cache1(1, 0, fetcher);
- FileBlockCache cache2(0, 1, fetcher);
- FileBlockCache cache3(0, 0, fetcher);
+ FileBlockCache cache1(1, 0, 0, fetcher);
+ FileBlockCache cache2(0, 1, 0, fetcher);
+ FileBlockCache cache3(0, 0, 0, fetcher);
std::vector<char> out;
TF_EXPECT_OK(cache1.Read(want_offset, want_n, &out));
EXPECT_EQ(calls, 1);
@@ -68,7 +69,7 @@ TEST(FileBlockCacheTest, BlockAlignment) {
for (uint64_t block_size = 2; block_size <= 4; block_size++) {
// Make a cache of N-byte block size (1 block) and verify that reads of
// varying offsets and lengths return correct data.
- FileBlockCache cache(block_size, 1, fetcher);
+ FileBlockCache cache(block_size, 1, 0, fetcher);
for (uint64_t offset = 0; offset < 10; offset++) {
for (size_t n = block_size - 2; n <= block_size + 2; n++) {
std::vector<char> got;
@@ -109,7 +110,7 @@ TEST(FileBlockCacheTest, CacheHits) {
return Status::OK();
};
const uint32 block_count = 256;
- FileBlockCache cache(block_size, block_count, fetcher);
+ FileBlockCache cache(block_size, block_count, 0, fetcher);
std::vector<char> out;
// The cache has space for `block_count` blocks. The loop with i = 0 should
// fill the cache, and the loop with i = 1 should be all cache hits. The
@@ -143,7 +144,7 @@ TEST(FileBlockCacheTest, OutOfRange) {
}
return Status::OK();
};
- FileBlockCache cache(block_size, 1, fetcher);
+ FileBlockCache cache(block_size, 1, 0, fetcher);
std::vector<char> out;
// Reading the first 16 bytes should be fine.
TF_EXPECT_OK(cache.Read(0, block_size, &out));
@@ -174,7 +175,7 @@ TEST(FileBlockCacheTest, Inconsistent) {
out->resize(1, 'x');
return Status::OK();
};
- FileBlockCache cache(block_size, 2, fetcher);
+ FileBlockCache cache(block_size, 2, 0, fetcher);
std::vector<char> out;
// Read the second block; this should yield an OK status and a single byte.
TF_EXPECT_OK(cache.Read(block_size, block_size, &out));
@@ -200,7 +201,7 @@ TEST(FileBlockCacheTest, LRU) {
return Status::OK();
};
const uint32 block_count = 2;
- FileBlockCache cache(block_size, block_count, fetcher);
+ FileBlockCache cache(block_size, block_count, 0, fetcher);
std::vector<char> out;
// Read blocks from the cache, and verify the LRU behavior based on the
// fetcher calls that the cache makes.
@@ -232,5 +233,35 @@ TEST(FileBlockCacheTest, LRU) {
TF_EXPECT_OK(cache.Read(0, 1, &out));
}
+TEST(FileBlockCacheTest, MaxStaleness) {
+ // This Env wrapper lets us control the NowSeconds() return value.
+ class FakeEnv : public EnvWrapper {
+ public:
+ FakeEnv() : EnvWrapper(Env::Default()) {}
+ uint64 NowSeconds() override { return now_; };
+ uint64 now_ = 1;
+ };
+ int calls = 0;
+ auto fetcher = [&calls](uint64 offset, size_t n, std::vector<char>* out) {
+ calls++;
+ out->resize(n, 'x');
+ return Status::OK();
+ };
+ std::unique_ptr<FakeEnv> env(new FakeEnv);
+ FileBlockCache cache(8, 1, 2 /* max staleness */, fetcher, env.get());
+ std::vector<char> out;
+ // Execute the first read to load the block.
+ TF_EXPECT_OK(cache.Read(0, 1, &out));
+ EXPECT_EQ(calls, 1);
+ // Now advance the clock one second at a time and redo the read. The call
+ // count should advance every 3 seconds (i.e. every time the staleness is
+ // greater than 2).
+ for (int i = 1; i <= 10; i++) {
+ env->now_ = i + 1;
+ TF_EXPECT_OK(cache.Read(0, 1, &out));
+ EXPECT_EQ(calls, 1 + i / 3);
+ }
+}
+
} // namespace
} // namespace tensorflow
diff --git a/tensorflow/core/platform/cloud/gcs_file_system.cc b/tensorflow/core/platform/cloud/gcs_file_system.cc
index 9f00735304..85def60a1e 100644
--- a/tensorflow/core/platform/cloud/gcs_file_system.cc
+++ b/tensorflow/core/platform/cloud/gcs_file_system.cc
@@ -62,6 +62,10 @@ constexpr char kBlockSize[] = "GCS_READ_CACHE_BLOCK_SIZE_MB";
// The environment variable that overrides the block count in the LRU cache of
// blocks read from GCS.
constexpr char kBlockCount[] = "GCS_READ_CACHE_BLOCK_COUNT";
+// The environment variable that overrides the maximum staleness of cached file
+// contents. Once any block of a file reaches this staleness, all cached blocks
+// will be evicted on the next read.
+constexpr char kMaxStaleness[] = "GCS_READ_CACHE_MAX_STALENESS";
// The file statistics returned by Stat() for directories.
const FileStatistics DIRECTORY_STAT(0, 0, true);
@@ -213,29 +217,19 @@ Status GetBoolValue(const Json::Value& parent, const string& name,
return Status::OK();
}
-/// A GCS-based implementation of a random access file with a read-ahead buffer.
+/// A GCS-based implementation of a random access file with an LRU block cache.
class GcsRandomAccessFile : public RandomAccessFile {
public:
- GcsRandomAccessFile(const string& bucket, const string& object,
- AuthProvider* auth_provider,
- HttpRequest::Factory* http_request_factory,
- size_t block_size, uint32 block_count)
- : bucket_(bucket),
- object_(object),
- auth_provider_(auth_provider),
- http_request_factory_(http_request_factory),
- file_block_cache_(
- block_size, block_count,
- [this](uint64 offset, size_t n, std::vector<char>* out) {
- return LoadBufferFromGCS(offset, n, out);
- }) {}
+ explicit GcsRandomAccessFile(
+ const std::shared_ptr<FileBlockCache>& file_block_cache)
+ : file_block_cache_(file_block_cache) {}
/// The implementation of reads with an LRU block cache. Thread safe.
Status Read(uint64 offset, size_t n, StringPiece* result,
char* scratch) const override {
result->clear();
std::vector<char> out;
- TF_RETURN_IF_ERROR(file_block_cache_.Read(offset, n, &out));
+ TF_RETURN_IF_ERROR(file_block_cache_->Read(offset, n, &out));
std::memcpy(scratch, out.data(), std::min(out.size(), n));
*result = StringPiece(scratch, std::min(out.size(), n));
if (result->size() < n) {
@@ -249,29 +243,8 @@ class GcsRandomAccessFile : public RandomAccessFile {
}
private:
- /// A helper function to actually read the data from GCS.
- Status LoadBufferFromGCS(uint64_t offset, size_t n, std::vector<char>* out) {
- string auth_token;
- TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_, &auth_token));
-
- std::unique_ptr<HttpRequest> request(http_request_factory_->Create());
- TF_RETURN_IF_ERROR(request->Init());
- TF_RETURN_IF_ERROR(
- request->SetUri(strings::StrCat("https://", kStorageHost, "/", bucket_,
- "/", request->EscapeString(object_))));
- TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token));
- TF_RETURN_IF_ERROR(request->SetRange(offset, offset + n - 1));
- TF_RETURN_IF_ERROR(request->SetResultBuffer(out));
- TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading gs://",
- bucket_, "/", object_);
- return Status::OK();
- }
-
- string bucket_;
- string object_;
- AuthProvider* auth_provider_;
- HttpRequest::Factory* http_request_factory_;
- mutable FileBlockCache file_block_cache_;
+ /// The LRU block cache for this file.
+ mutable std::shared_ptr<FileBlockCache> file_block_cache_;
};
/// \brief GCS-based implementation of a writeable file.
@@ -283,11 +256,13 @@ class GcsWritableFile : public WritableFile {
GcsWritableFile(const string& bucket, const string& object,
AuthProvider* auth_provider,
HttpRequest::Factory* http_request_factory,
+ std::function<void()> file_cache_erase,
int64 initial_retry_delay_usec)
: bucket_(bucket),
object_(object),
auth_provider_(auth_provider),
http_request_factory_(http_request_factory),
+ file_cache_erase_(std::move(file_cache_erase)),
sync_needed_(true),
initial_retry_delay_usec_(initial_retry_delay_usec) {
if (GetTmpFilename(&tmp_content_filename_).ok()) {
@@ -305,11 +280,13 @@ class GcsWritableFile : public WritableFile {
AuthProvider* auth_provider,
const string& tmp_content_filename,
HttpRequest::Factory* http_request_factory,
+ std::function<void()> file_cache_erase,
int64 initial_retry_delay_usec)
: bucket_(bucket),
object_(object),
auth_provider_(auth_provider),
http_request_factory_(http_request_factory),
+ file_cache_erase_(std::move(file_cache_erase)),
sync_needed_(true),
initial_retry_delay_usec_(initial_retry_delay_usec) {
tmp_content_filename_ = tmp_content_filename;
@@ -377,6 +354,8 @@ class GcsWritableFile : public WritableFile {
TF_RETURN_IF_ERROR(RequestUploadSessionStatus(
session_uri, &completed, &already_uploaded));
if (completed) {
+ // Erase the file from the file cache on every successful write.
+ file_cache_erase_();
// It's unclear why UploadToSession didn't return OK in the
// previous attempt, but GCS reports that the file is fully
// uploaded, so succeed.
@@ -529,6 +508,8 @@ class GcsWritableFile : public WritableFile {
request->SetPutFromFile(tmp_content_filename_, start_offset));
TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when uploading ",
GetGcsPath());
+ // Erase the file from the file cache on every successful write.
+ file_cache_erase_();
return Status::OK();
}
@@ -542,6 +523,7 @@ class GcsWritableFile : public WritableFile {
string tmp_content_filename_;
std::ofstream outfile_;
HttpRequest::Factory* http_request_factory_;
+ std::function<void()> file_cache_erase_;
bool sync_needed_; // whether there is buffered data that needs to be synced
int64 initial_retry_delay_usec_;
};
@@ -557,55 +539,107 @@ class GcsReadOnlyMemoryRegion : public ReadOnlyMemoryRegion {
std::unique_ptr<char[]> data_;
uint64 length_;
};
+
+// Helper function to extract an environment variable and convert it into a
+// value of type T.
+template <typename T>
+bool GetEnvVar(const char* varname, bool (*convert)(StringPiece, T*),
+ T* value) {
+ const char* env_value = std::getenv(varname);
+ if (!env_value) {
+ return false;
+ }
+ return convert(env_value, value);
+}
+
} // namespace
GcsFileSystem::GcsFileSystem()
: auth_provider_(new GoogleAuthProvider()),
http_request_factory_(new HttpRequest::Factory()) {
// Apply the sys env override for the readahead buffer size if it's provided.
- const char* readahead_buffer_size_env = std::getenv(kReadaheadBufferSize);
- if (readahead_buffer_size_env) {
- uint64 value;
- if (strings::safe_strtou64(readahead_buffer_size_env, &value)) {
- block_size_ = value;
- }
+ uint64 v64;
+ if (GetEnvVar(kReadaheadBufferSize, strings::safe_strtou64, &v64)) {
+ block_size_ = v64;
}
// Apply the override for the block size if provided. This takes precedence
// over the readahead buffer size.
- const char* block_size_env = std::getenv(kBlockSize);
- if (block_size_env) {
- uint64 value;
- if (strings::safe_strtou64(block_size_env, &value)) {
- block_size_ = value * 1024 * 1024;
- }
+ if (GetEnvVar(kBlockSize, strings::safe_strtou64, &v64)) {
+ block_size_ = v64 * 1024 * 1024;
}
// Apply the override for the block count if provided.
- const char* block_count_env = std::getenv(kBlockCount);
- if (block_count_env) {
- uint32 value;
- if (strings::safe_strtou32(block_count_env, &value)) {
- block_count_ = value;
- }
+ uint32 v32;
+ if (GetEnvVar(kBlockCount, strings::safe_strtou32, &v32)) {
+ block_count_ = v32;
+ }
+ // Apply the override for max staleness if provided.
+ if (GetEnvVar(kMaxStaleness, strings::safe_strtou64, &v64)) {
+ max_staleness_ = v64;
}
}
GcsFileSystem::GcsFileSystem(
std::unique_ptr<AuthProvider> auth_provider,
std::unique_ptr<HttpRequest::Factory> http_request_factory,
- size_t block_size, uint32 block_count, int64 initial_retry_delay_usec)
+ size_t block_size, uint32 block_count, uint64 max_staleness,
+ int64 initial_retry_delay_usec)
: auth_provider_(std::move(auth_provider)),
http_request_factory_(std::move(http_request_factory)),
block_size_(block_size),
block_count_(block_count),
+ max_staleness_(max_staleness),
initial_retry_delay_usec_(initial_retry_delay_usec) {}
Status GcsFileSystem::NewRandomAccessFile(
const string& fname, std::unique_ptr<RandomAccessFile>* result) {
string bucket, object;
TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
- result->reset(new GcsRandomAccessFile(bucket, object, auth_provider_.get(),
- http_request_factory_.get(),
- block_size_, block_count_));
+ // `file_cache_` is a container of FileBlockCache, keyed by filename. We look
+ // up the filename in this container to see if we have a FileBlockCache for
+ // it. If the FileBlockCache for `fname` exists in file_cache_, we return it.
+ // Otherwise, we create a new FileBlockCache with a block fetcher that calls
+ // GcsFileSystem::LoadBufferFromGCS for the bucket and object derived from
+ // `fname`. If a FileBlockCache is created, it is added to `file_cache_` only
+ // if `max_staleness_` > 0 (indicating that new random acesss files will
+ // tolerate stale reads coming from FileBlockCache instances that persist
+ // across file close/open boundaries).
+ std::shared_ptr<FileBlockCache> file_block_cache;
+ mutex_lock lock(mu_);
+ auto entry = file_cache_.find(fname);
+ if (entry == file_cache_.end()) {
+ file_block_cache.reset(new FileBlockCache(
+ block_size_, block_count_, max_staleness_,
+ [this, bucket, object](uint64 offset, size_t n,
+ std::vector<char>* out) {
+ return LoadBufferFromGCS(bucket, object, offset, n, out);
+ }));
+ if (max_staleness_ > 0) {
+ file_cache_[fname] = file_block_cache;
+ }
+ } else {
+ file_block_cache = entry->second;
+ }
+ result->reset(new GcsRandomAccessFile(file_block_cache));
+ return Status::OK();
+}
+
+// A helper function to actually read the data from GCS.
+Status GcsFileSystem::LoadBufferFromGCS(const string& bucket,
+ const string& object, uint64_t offset,
+ size_t n, std::vector<char>* out) {
+ string auth_token;
+ TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_.get(), &auth_token));
+
+ std::unique_ptr<HttpRequest> request(http_request_factory_->Create());
+ TF_RETURN_IF_ERROR(request->Init());
+ TF_RETURN_IF_ERROR(
+ request->SetUri(strings::StrCat("https://", kStorageHost, "/", bucket,
+ "/", request->EscapeString(object))));
+ TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token));
+ TF_RETURN_IF_ERROR(request->SetRange(offset, offset + n - 1));
+ TF_RETURN_IF_ERROR(request->SetResultBuffer(out));
+ TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading gs://",
+ bucket, "/", object);
return Status::OK();
}
@@ -615,6 +649,10 @@ Status GcsFileSystem::NewWritableFile(const string& fname,
TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
result->reset(new GcsWritableFile(bucket, object, auth_provider_.get(),
http_request_factory_.get(),
+ [this, fname]() {
+ mutex_lock lock(mu_);
+ file_cache_.erase(fname);
+ },
initial_retry_delay_usec_));
return Status::OK();
}
@@ -653,9 +691,14 @@ Status GcsFileSystem::NewAppendableFile(const string& fname,
// Create a writable file and pass the old content to it.
string bucket, object;
TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
- result->reset(new GcsWritableFile(
- bucket, object, auth_provider_.get(), old_content_filename,
- http_request_factory_.get(), initial_retry_delay_usec_));
+ result->reset(new GcsWritableFile(bucket, object, auth_provider_.get(),
+ old_content_filename,
+ http_request_factory_.get(),
+ [this, fname]() {
+ mutex_lock lock(mu_);
+ file_cache_.erase(fname);
+ },
+ initial_retry_delay_usec_));
return Status::OK();
}
diff --git a/tensorflow/core/platform/cloud/gcs_file_system.h b/tensorflow/core/platform/cloud/gcs_file_system.h
index ef8899892e..6030b16aa0 100644
--- a/tensorflow/core/platform/cloud/gcs_file_system.h
+++ b/tensorflow/core/platform/cloud/gcs_file_system.h
@@ -20,6 +20,7 @@ limitations under the License.
#include <vector>
#include "tensorflow/core/lib/core/status.h"
#include "tensorflow/core/platform/cloud/auth_provider.h"
+#include "tensorflow/core/platform/cloud/file_block_cache.h"
#include "tensorflow/core/platform/cloud/http_request.h"
#include "tensorflow/core/platform/cloud/retrying_file_system.h"
#include "tensorflow/core/platform/file_system.h"
@@ -35,7 +36,7 @@ class GcsFileSystem : public FileSystem {
GcsFileSystem();
GcsFileSystem(std::unique_ptr<AuthProvider> auth_provider,
std::unique_ptr<HttpRequest::Factory> http_request_factory,
- size_t block_size, uint32 block_count,
+ size_t block_size, uint32 block_count, uint64 max_staleness,
int64 initial_retry_delay_usec);
Status NewRandomAccessFile(
@@ -77,6 +78,7 @@ class GcsFileSystem : public FileSystem {
int64* undeleted_dirs) override;
size_t block_size() const { return block_size_; }
uint32 block_count() const { return block_count_; }
+ uint64 max_staleness() const { return max_staleness_; }
private:
/// \brief Checks if the bucket exists. Returns OK if the check succeeded.
@@ -110,9 +112,19 @@ class GcsFileSystem : public FileSystem {
FileStatistics* stat);
Status RenameObject(const string& src, const string& target);
+ /// Loads file contents from GCS for a given bucket and object.
+ Status LoadBufferFromGCS(const string& bucket, const string& object,
+ uint64_t offset, size_t n, std::vector<char>* out);
+
std::unique_ptr<AuthProvider> auth_provider_;
std::unique_ptr<HttpRequest::Factory> http_request_factory_;
+ /// Guards access to the file cache.
+ mutex mu_;
+
+ /// Cache from filename to FileBlockCache. Populated iff max_staleness_ > 0.
+ std::map<string, std::shared_ptr<FileBlockCache>> file_cache_ GUARDED_BY(mu_);
+
/// The block size for block-aligned reads in the RandomAccessFile
/// implementation. Defaults to 256Mb.
size_t block_size_ = 256 * 1024 * 1024;
@@ -121,6 +133,11 @@ class GcsFileSystem : public FileSystem {
/// implementation. Defaults to 1.
uint32 block_count_ = 1;
+ /// The maximum staleness of cached file blocks across close/open boundaries.
+ /// Defaults to 0, meaning that file blocks do not persist across close/open
+ /// boundaries.
+ uint64 max_staleness_ = 0;
+
/// The initial delay for exponential backoffs when retrying failed calls.
const int64 initial_retry_delay_usec_ = 1000000L;
diff --git a/tensorflow/core/platform/cloud/gcs_file_system_test.cc b/tensorflow/core/platform/cloud/gcs_file_system_test.cc
index defedfdb93..915a44ad7f 100644
--- a/tensorflow/core/platform/cloud/gcs_file_system_test.cc
+++ b/tensorflow/core/platform/cloud/gcs_file_system_test.cc
@@ -46,7 +46,7 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_NoBlockCache) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::unique_ptr<RandomAccessFile> file;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
@@ -81,7 +81,7 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_NoBlockCache_differentN) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::unique_ptr<RandomAccessFile> file;
TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/random_access.txt", &file));
@@ -131,7 +131,7 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
9 /* block size */, 2 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
char scratch[100];
StringPiece result;
@@ -181,19 +181,134 @@ TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache) {
EXPECT_EQ("0123", result);
}
+TEST(GcsFileSystemTest, NewRandomAccessFile_WithBlockCache_MaxStaleness) {
+ // Our underlying file in this test is a 16 byte file with contents
+ // "0123456789abcdef".
+ std::vector<HttpRequest*> requests(
+ {new FakeHttpRequest("Uri: https://storage.googleapis.com/bucket/object\n"
+ "Auth Token: fake_token\n"
+ "Range: 0-7\n",
+ "01234567"),
+ new FakeHttpRequest("Uri: https://storage.googleapis.com/bucket/object\n"
+ "Auth Token: fake_token\n"
+ "Range: 8-15\n",
+ "89abcdef")});
+ GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider),
+ std::unique_ptr<HttpRequest::Factory>(
+ new FakeHttpRequestFactory(&requests)),
+ 8 /* block size */, 2 /* block count */,
+ 3600 /* max staleness */, 0 /* initial retry delay */);
+ char scratch[100];
+ StringPiece result;
+ // There should only be two HTTP requests issued to GCS even though we iterate
+ // this loop 10 times. This shows that the underlying FileBlockCache persists
+ // across file close/open boundaries.
+ for (int i = 0; i < 10; i++) {
+ // Create two files. Since these files have the same name name and the max
+ // staleness of the filesystem is > 0, they will share the same block cache.
+ std::unique_ptr<RandomAccessFile> file1;
+ std::unique_ptr<RandomAccessFile> file2;
+ TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/object", &file1));
+ TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/object", &file2));
+ // Reading the first block from file1 should load it once.
+ TF_EXPECT_OK(file1->Read(0, 8, &result, scratch));
+ EXPECT_EQ("01234567", result);
+ // Reading the first block from file2 should not trigger a request to load
+ // the first block again, because the FileBlockCache shared by file1 and
+ // file2 already has the first block.
+ TF_EXPECT_OK(file2->Read(0, 8, &result, scratch));
+ EXPECT_EQ("01234567", result);
+ // Reading the second block from file2 should load it once.
+ TF_EXPECT_OK(file2->Read(8, 8, &result, scratch));
+ EXPECT_EQ("89abcdef", result);
+ // Reading the second block from file1 should not trigger a request to load
+ // the second block again, because the FileBlockCache shared by file1 and
+ // file2 already has the second block.
+ TF_EXPECT_OK(file1->Read(8, 8, &result, scratch));
+ EXPECT_EQ("89abcdef", result);
+ }
+}
+
TEST(GcsFileSystemTest, NewRandomAccessFile_NoObjectName) {
std::vector<HttpRequest*> requests;
GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider),
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* read ahead bytes */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::unique_ptr<RandomAccessFile> file;
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
fs.NewRandomAccessFile("gs://bucket/", &file).code());
}
+TEST(GcsFileSystemTest, NewWritableFile_FlushesFileCache) {
+ // Our underlying file in this test is a 16 byte file with initial contents
+ // "0123456789abcdef", replaced by new contents "08192a3b4c5d6e7f".
+ std::vector<HttpRequest*> requests(
+ {new FakeHttpRequest("Uri: https://storage.googleapis.com/bucket/object\n"
+ "Auth Token: fake_token\n"
+ "Range: 0-7\n",
+ "01234567"),
+ new FakeHttpRequest("Uri: https://storage.googleapis.com/bucket/object\n"
+ "Auth Token: fake_token\n"
+ "Range: 8-15\n",
+ "89abcdef"),
+ new FakeHttpRequest(
+ "Uri: https://www.googleapis.com/upload/storage/v1/b/bucket/o?"
+ "uploadType=resumable&name=object\n"
+ "Auth Token: fake_token\n"
+ "Header X-Upload-Content-Length: 16\n"
+ "Post: yes\n",
+ "", {{"Location", "https://custom/upload/location"}}),
+ new FakeHttpRequest("Uri: https://custom/upload/location\n"
+ "Auth Token: fake_token\n"
+ "Header Content-Range: bytes 0-15/16\n"
+ "Put body: 08192a3b4c5d6e7f\n",
+ ""),
+ new FakeHttpRequest("Uri: https://storage.googleapis.com/bucket/object\n"
+ "Auth Token: fake_token\n"
+ "Range: 0-7\n",
+ "08192a3b"),
+ new FakeHttpRequest("Uri: https://storage.googleapis.com/bucket/object\n"
+ "Auth Token: fake_token\n"
+ "Range: 8-15\n",
+ "4c5d6e7f")});
+ GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider),
+ std::unique_ptr<HttpRequest::Factory>(
+ new FakeHttpRequestFactory(&requests)),
+ 8 /* block size */, 2 /* block count */,
+ 3600 /* max staleness */, 0 /* initial retry delay */);
+ // First, read both blocks of the file with its initial contents. This should
+ // trigger the first two HTTP requests to GCS, to load each of the blocks in
+ // order.
+ std::unique_ptr<RandomAccessFile> rfile;
+ TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/object", &rfile));
+ char scratch[100];
+ StringPiece result;
+ TF_EXPECT_OK(rfile->Read(0, 16, &result, scratch));
+ EXPECT_EQ("0123456789abcdef", result);
+ // Now open a writable file and write the new file contents. This should
+ // trigger the next two HTTP requests to GCS.
+ std::unique_ptr<WritableFile> wfile;
+ TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/object", &wfile));
+ TF_EXPECT_OK(wfile->Append("08192a3b4c5d6e7f"));
+ // This rfile read should hit the block cache and not trigger any requests,
+ // because we haven't flushed the write yet.
+ TF_EXPECT_OK(rfile->Read(0, 16, &result, scratch));
+ EXPECT_EQ("0123456789abcdef", result);
+ // Now flush the write. Any opens of gs://bucket/object after this point will
+ // no longer use the previous FileBlockCache for that file.
+ TF_EXPECT_OK(wfile->Flush());
+ // Reopen `rfile` and read its contents again. This will trigger the next two
+ // HTTP requests to GCS, which will return the (updated) file contents. Note
+ // that the contents returned in those HTTP requests are not really relevant;
+ // the main thing is that both blocks of the file are reloaded.
+ TF_EXPECT_OK(fs.NewRandomAccessFile("gs://bucket/object", &rfile));
+ TF_EXPECT_OK(rfile->Read(0, 16, &result, scratch));
+ EXPECT_EQ("08192a3b4c5d6e7f", result);
+}
+
TEST(GcsFileSystemTest, NewWritableFile) {
std::vector<HttpRequest*> requests(
{new FakeHttpRequest(
@@ -212,7 +327,7 @@ TEST(GcsFileSystemTest, NewWritableFile) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::unique_ptr<WritableFile> file;
TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file));
@@ -267,7 +382,7 @@ TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadSucceeds) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::unique_ptr<WritableFile> file;
TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file));
@@ -300,7 +415,7 @@ TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadSucceedsOnGetStatus) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::unique_ptr<WritableFile> file;
TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file));
@@ -358,7 +473,7 @@ TEST(GcsFileSystemTest, NewWritableFile_ResumeUploadAllAttemptsFail) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 2 /* initial retry delay */);
+ 0 /* max staleness */, 2 /* initial retry delay */);
std::unique_ptr<WritableFile> file;
TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file));
@@ -406,7 +521,7 @@ TEST(GcsFileSystemTest, NewWritableFile_UploadReturns410) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::unique_ptr<WritableFile> file;
TF_EXPECT_OK(fs.NewWritableFile("gs://bucket/path/writeable.txt", &file));
@@ -432,7 +547,7 @@ TEST(GcsFileSystemTest, NewWritableFile_NoObjectName) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::unique_ptr<WritableFile> file;
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
@@ -462,7 +577,7 @@ TEST(GcsFileSystemTest, NewAppendableFile) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::unique_ptr<WritableFile> file;
TF_EXPECT_OK(fs.NewAppendableFile("gs://bucket/path/appendable.txt", &file));
@@ -477,7 +592,7 @@ TEST(GcsFileSystemTest, NewAppendableFile_NoObjectName) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::unique_ptr<WritableFile> file;
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
@@ -504,7 +619,7 @@ TEST(GcsFileSystemTest, NewReadOnlyMemoryRegionFromFile) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::unique_ptr<ReadOnlyMemoryRegion> region;
TF_EXPECT_OK(fs.NewReadOnlyMemoryRegionFromFile(
@@ -520,7 +635,7 @@ TEST(GcsFileSystemTest, NewReadOnlyMemoryRegionFromFile_NoObjectName) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::unique_ptr<ReadOnlyMemoryRegion> region;
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
@@ -538,7 +653,7 @@ TEST(GcsFileSystemTest, FileExists_YesAsObject) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
TF_EXPECT_OK(fs.FileExists("gs://bucket/path/file1.txt"));
}
@@ -561,7 +676,7 @@ TEST(GcsFileSystemTest, FileExists_YesAsFolder) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
TF_EXPECT_OK(fs.FileExists("gs://bucket/path/subfolder"));
}
@@ -580,7 +695,7 @@ TEST(GcsFileSystemTest, FileExists_YesAsBucket) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
TF_EXPECT_OK(fs.FileExists("gs://bucket1"));
TF_EXPECT_OK(fs.FileExists("gs://bucket1/"));
@@ -603,7 +718,7 @@ TEST(GcsFileSystemTest, FileExists_NotAsObjectOrFolder) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
EXPECT_EQ(errors::Code::NOT_FOUND,
fs.FileExists("gs://bucket/path/file1.txt").code());
@@ -623,7 +738,7 @@ TEST(GcsFileSystemTest, FileExists_NotAsBucket) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
fs.FileExists("gs://bucket2/").code());
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
@@ -641,7 +756,7 @@ TEST(GcsFileSystemTest, GetChildren_NoItems) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::vector<string> children;
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", &children));
@@ -663,7 +778,7 @@ TEST(GcsFileSystemTest, GetChildren_ThreeFiles) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::vector<string> children;
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", &children));
@@ -686,7 +801,7 @@ TEST(GcsFileSystemTest, GetChildren_SelfDirectoryMarker) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::vector<string> children;
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", &children));
@@ -708,7 +823,7 @@ TEST(GcsFileSystemTest, GetChildren_ThreeFiles_NoSlash) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::vector<string> children;
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path", &children));
@@ -727,7 +842,7 @@ TEST(GcsFileSystemTest, GetChildren_Root) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::vector<string> children;
TF_EXPECT_OK(fs.GetChildren("gs://bucket-a-b-c", &children));
@@ -746,7 +861,7 @@ TEST(GcsFileSystemTest, GetChildren_Empty) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::vector<string> children;
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path/", &children));
@@ -780,7 +895,7 @@ TEST(GcsFileSystemTest, GetChildren_Pagination) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::vector<string> children;
TF_EXPECT_OK(fs.GetChildren("gs://bucket/path", &children));
@@ -801,7 +916,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_NoWildcard) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::vector<string> result;
TF_EXPECT_OK(
@@ -823,7 +938,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_BucketAndWildcard) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::vector<string> result;
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/*/*", &result));
@@ -846,7 +961,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_FolderAndWildcard_Matches) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::vector<string> result;
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*/file2.txt", &result));
@@ -866,7 +981,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_SelfDirectoryMarker) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::vector<string> result;
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*", &result));
@@ -886,7 +1001,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_FolderAndWildcard_NoMatches) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::vector<string> result;
TF_EXPECT_OK(fs.GetMatchingPaths("gs://bucket/path/*/file3.txt", &result));
@@ -899,7 +1014,7 @@ TEST(GcsFileSystemTest, GetMatchingPaths_OnlyWildcard) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
std::vector<string> result;
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
@@ -917,7 +1032,7 @@ TEST(GcsFileSystemTest, DeleteFile) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
TF_EXPECT_OK(fs.DeleteFile("gs://bucket/path/file1.txt"));
}
@@ -928,7 +1043,7 @@ TEST(GcsFileSystemTest, DeleteFile_NoObjectName) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
fs.DeleteFile("gs://bucket/").code());
@@ -944,7 +1059,7 @@ TEST(GcsFileSystemTest, DeleteDir_Empty) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
TF_EXPECT_OK(fs.DeleteDir("gs://bucket/path/"));
}
@@ -966,7 +1081,7 @@ TEST(GcsFileSystemTest, DeleteDir_OnlyDirMarkerLeft) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
TF_EXPECT_OK(fs.DeleteDir("gs://bucket/path/"));
}
@@ -980,7 +1095,7 @@ TEST(GcsFileSystemTest, DeleteDir_BucketOnly) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
TF_EXPECT_OK(fs.DeleteDir("gs://bucket"));
}
@@ -996,7 +1111,7 @@ TEST(GcsFileSystemTest, DeleteDir_NonEmpty) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
EXPECT_EQ(error::Code::FAILED_PRECONDITION,
fs.DeleteDir("gs://bucket/path/").code());
@@ -1013,7 +1128,7 @@ TEST(GcsFileSystemTest, GetFileSize) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
uint64 size;
TF_EXPECT_OK(fs.GetFileSize("gs://bucket/file.txt", &size));
@@ -1026,7 +1141,7 @@ TEST(GcsFileSystemTest, GetFileSize_NoObjectName) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
uint64 size;
EXPECT_EQ(errors::Code::INVALID_ARGUMENT,
@@ -1099,7 +1214,7 @@ TEST(GcsFileSystemTest, RenameFile_Folder) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
TF_EXPECT_OK(fs.RenameFile("gs://bucket/path1", "gs://bucket/path2/"));
}
@@ -1138,7 +1253,7 @@ TEST(GcsFileSystemTest, RenameFile_Object) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
TF_EXPECT_OK(
fs.RenameFile("gs://bucket/path/src.txt", "gs://bucket/path/dst.txt"));
@@ -1186,7 +1301,7 @@ TEST(GcsFileSystemTest, RenameFile_Object_DeletionRetried) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
TF_EXPECT_OK(
fs.RenameFile("gs://bucket/path/src.txt", "gs://bucket/path/dst.txt"));
@@ -1220,7 +1335,7 @@ TEST(GcsFileSystemTest, RenameFile_Object_Incomplete) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
EXPECT_EQ(
errors::Code::UNIMPLEMENTED,
@@ -1239,7 +1354,7 @@ TEST(GcsFileSystemTest, Stat_Object) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
FileStatistics stat;
TF_EXPECT_OK(fs.Stat("gs://bucket/file.txt", &stat));
@@ -1266,7 +1381,7 @@ TEST(GcsFileSystemTest, Stat_Folder) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
FileStatistics stat;
TF_EXPECT_OK(fs.Stat("gs://bucket/subfolder", &stat));
@@ -1292,7 +1407,7 @@ TEST(GcsFileSystemTest, Stat_ObjectOrFolderNotFound) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
FileStatistics stat;
EXPECT_EQ(error::Code::NOT_FOUND, fs.Stat("gs://bucket/path", &stat).code());
@@ -1307,7 +1422,7 @@ TEST(GcsFileSystemTest, Stat_Bucket) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
FileStatistics stat;
TF_EXPECT_OK(fs.Stat("gs://bucket/", &stat));
@@ -1325,7 +1440,7 @@ TEST(GcsFileSystemTest, Stat_BucketNotFound) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
FileStatistics stat;
EXPECT_EQ(error::Code::NOT_FOUND, fs.Stat("gs://bucket/", &stat).code());
@@ -1348,7 +1463,7 @@ TEST(GcsFileSystemTest, IsDirectory_NotFound) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
EXPECT_EQ(error::Code::NOT_FOUND,
fs.IsDirectory("gs://bucket/file.txt").code());
@@ -1372,7 +1487,7 @@ TEST(GcsFileSystemTest, IsDirectory_NotDirectoryButObject) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
EXPECT_EQ(error::Code::FAILED_PRECONDITION,
fs.IsDirectory("gs://bucket/file.txt").code());
@@ -1396,7 +1511,7 @@ TEST(GcsFileSystemTest, IsDirectory_Yes) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
TF_EXPECT_OK(fs.IsDirectory("gs://bucket/subfolder"));
TF_EXPECT_OK(fs.IsDirectory("gs://bucket/subfolder/"));
@@ -1416,7 +1531,7 @@ TEST(GcsFileSystemTest, IsDirectory_Bucket) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
TF_EXPECT_OK(fs.IsDirectory("gs://bucket"));
TF_EXPECT_OK(fs.IsDirectory("gs://bucket/"));
@@ -1431,7 +1546,7 @@ TEST(GcsFileSystemTest, IsDirectory_BucketNotFound) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
EXPECT_EQ(error::Code::NOT_FOUND, fs.IsDirectory("gs://bucket/").code());
}
@@ -1464,7 +1579,7 @@ TEST(GcsFileSystemTest, CreateDir_Folder) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
TF_EXPECT_OK(fs.CreateDir("gs://bucket/subpath"));
TF_EXPECT_OK(fs.CreateDir("gs://bucket/subpath/"));
@@ -1484,7 +1599,7 @@ TEST(GcsFileSystemTest, CreateDir_Bucket) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
TF_EXPECT_OK(fs.CreateDir("gs://bucket/"));
TF_EXPECT_OK(fs.CreateDir("gs://bucket"));
@@ -1544,7 +1659,7 @@ TEST(GcsFileSystemTest, DeleteRecursively_Ok) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
int64 undeleted_files, undeleted_dirs;
TF_EXPECT_OK(fs.DeleteRecursively("gs://bucket/path", &undeleted_files,
@@ -1623,7 +1738,7 @@ TEST(GcsFileSystemTest, DeleteRecursively_DeletionErrors) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
int64 undeleted_files, undeleted_dirs;
TF_EXPECT_OK(fs.DeleteRecursively("gs://bucket/path", &undeleted_files,
@@ -1651,7 +1766,7 @@ TEST(GcsFileSystemTest, DeleteRecursively_NotAFolder) {
std::unique_ptr<HttpRequest::Factory>(
new FakeHttpRequestFactory(&requests)),
0 /* block size */, 0 /* block count */,
- 0 /* initial retry delay */);
+ 0 /* max staleness */, 0 /* initial retry delay */);
int64 undeleted_files, undeleted_dirs;
EXPECT_EQ(error::Code::NOT_FOUND,
@@ -1667,6 +1782,7 @@ TEST(GcsFileSystemTest, OverrideCacheParameters) {
GcsFileSystem fs1;
EXPECT_EQ(256 * 1024 * 1024, fs1.block_size());
EXPECT_EQ(1, fs1.block_count());
+ EXPECT_EQ(0, fs1.max_staleness());
// Verify legacy readahead buffer override sets block size.
setenv("GCS_READAHEAD_BUFFER_SIZE_BYTES", "123456789", 1);
@@ -1682,6 +1798,11 @@ TEST(GcsFileSystemTest, OverrideCacheParameters) {
setenv("GCS_READ_CACHE_BLOCK_COUNT", "16", 1);
GcsFileSystem fs4;
EXPECT_EQ(16, fs4.block_count());
+
+ // Verify max staleness override.
+ setenv("GCS_READ_CACHE_MAX_STALENESS", "60", 1);
+ GcsFileSystem fs5;
+ EXPECT_EQ(60, fs5.max_staleness());
}
} // namespace