diff options
author | 2016-06-24 08:08:50 -0800 | |
---|---|---|
committer | 2016-06-24 09:18:06 -0700 | |
commit | 3854ea49a44de3eb2e3eb225b9e35fbbd3a965d8 (patch) | |
tree | 79fb2a909ae2df104170e4db253f011874eea0a4 /tensorflow/core | |
parent | 4994af33fd8d79402d581e5f35be0f796e8028c1 (diff) |
Minor improvements to GcsFileSystem.
- adds support of paths pointing to the root of a bucket, e.g gs://bucket
- adds pagination support for GetChildren
- makes a more optimal HTTP request in GetChildren
Change: 125785863
Diffstat (limited to 'tensorflow/core')
-rw-r--r-- | tensorflow/core/platform/cloud/gcs_file_system.cc | 152 | ||||
-rw-r--r-- | tensorflow/core/platform/cloud/gcs_file_system_test.cc | 80 |
2 files changed, 160 insertions, 72 deletions
diff --git a/tensorflow/core/platform/cloud/gcs_file_system.cc b/tensorflow/core/platform/cloud/gcs_file_system.cc index b9c9a04397..dc04939161 100644 --- a/tensorflow/core/platform/cloud/gcs_file_system.cc +++ b/tensorflow/core/platform/cloud/gcs_file_system.cc @@ -27,7 +27,6 @@ limitations under the License. #include "tensorflow/core/lib/gtl/map_util.h" #include "tensorflow/core/lib/gtl/stl_util.h" #include "tensorflow/core/lib/strings/numbers.h" -#include "tensorflow/core/lib/strings/scanner.h" #include "tensorflow/core/lib/strings/str_util.h" #include "tensorflow/core/platform/cloud/google_auth_provider.h" #include "tensorflow/core/platform/env.h" @@ -62,22 +61,23 @@ Status GetTmpFilename(string* filename) { /// /// For example, "gs://bucket-name/path/to/file.txt" gets split into /// "bucket-name" and "path/to/file.txt". -Status ParseGcsPath(const string& fname, string* bucket, string* object) { +/// If fname only contains the bucket, the returned object is empty. +Status ParseGcsPath(StringPiece fname, string* bucket, string* object) { if (!bucket || !object) { return errors::Internal("bucket and object cannot be null."); } - StringPiece matched_bucket, matched_object; - if (!strings::Scanner(fname) - .OneLiteral("gs://") - .RestartCapture() - .ScanEscapedUntil('/') - .OneLiteral("/") - .GetResult(&matched_object, &matched_bucket)) { - return errors::InvalidArgument("Couldn't parse GCS path: " + fname); + if (!fname.Consume("gs://")) { + return errors::InvalidArgument("GCS path must start with gs://"); + } + auto first_slash = fname.find('/'); + if (first_slash == -1) { + *bucket = fname.ToString(); + *object = string(); + } else { + *bucket = fname.substr(0, first_slash).ToString(); + fname.remove_prefix(first_slash + 1); + *object = fname.ToString(); } - // 'matched_bucket' contains a trailing slash, exclude it. - *bucket = string(matched_bucket.data(), matched_bucket.size() - 1); - *object = string(matched_object.data(), matched_object.size()); return Status::OK(); } @@ -370,9 +370,13 @@ bool GcsFileSystem::FileExists(const string& fname) { LOG(ERROR) << "Could not initialize the HTTP request."; return false; } - request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket, "/o/", - request->EscapeString(object_prefix), - "?fields=size")); + if (!object_prefix.empty()) { + request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket, "/o/", + request->EscapeString(object_prefix), + "?fields=size")); + } else { + request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket)); + } request->AddAuthBearerHeader(auth_token); return request->Send().ok(); } @@ -389,58 +393,74 @@ Status GcsFileSystem::GetChildren(const string& dirname, string bucket, object_prefix; TF_RETURN_IF_ERROR(ParseGcsPath(sanitized_dirname, &bucket, &object_prefix)); - string auth_token; - TF_RETURN_IF_ERROR(AuthProvider::GetToken(auth_provider_.get(), &auth_token)); + string nextPageToken; + while (true) { // A loop over multiple result pages. + string auth_token; + TF_RETURN_IF_ERROR( + AuthProvider::GetToken(auth_provider_.get(), &auth_token)); - std::unique_ptr<char[]> scratch(new char[kBufferSize]); - StringPiece response_piece; - std::unique_ptr<HttpRequest> request(http_request_factory_->Create()); - TF_RETURN_IF_ERROR(request->Init()); - TF_RETURN_IF_ERROR(request->SetUri( - strings::StrCat(kGcsUriBase, "b/", bucket, "/o?prefix=", - request->EscapeString(object_prefix), "&fields=items"))); - TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token)); - // TODO(surkov): Implement pagination using maxResults and pageToken - // instead, so that all items can be read regardless of their count. - // Currently one item takes about 1KB in the response, so with a 1MB - // buffer size this will read fewer than 1000 objects. - TF_RETURN_IF_ERROR( - request->SetResultBuffer(scratch.get(), kBufferSize, &response_piece)); - TF_RETURN_IF_ERROR(request->Send()); - std::stringstream response_stream; - response_stream << response_piece; - Json::Value root; - Json::Reader reader; - if (!reader.parse(response_stream.str(), root)) { - return errors::Internal("Couldn't parse JSON response from GCS."); - } - const auto items = root.get("items", Json::Value::null); - if (items == Json::Value::null) { - // Empty results. - return Status::OK(); - } - if (!items.isArray()) { - return errors::Internal("Expected an array 'items' in the GCS response."); - } - for (size_t i = 0; i < items.size(); i++) { - const auto item = items.get(i, Json::Value::null); - if (!item.isObject()) { - return errors::Internal( - "Unexpected JSON format: 'items' should be a list of objects."); + std::unique_ptr<char[]> scratch(new char[kBufferSize]); + StringPiece response_piece; + std::unique_ptr<HttpRequest> request(http_request_factory_->Create()); + TF_RETURN_IF_ERROR(request->Init()); + auto uri = strings::StrCat(kGcsUriBase, "b/", bucket, + "/o?fields=items%2Fname%2CnextPageToken"); + if (!object_prefix.empty()) { + uri = strings::StrCat(uri, "&prefix=", + request->EscapeString(object_prefix)); + } + if (!nextPageToken.empty()) { + uri = strings::StrCat(uri, "&pageToken=", + request->EscapeString(nextPageToken)); + } + TF_RETURN_IF_ERROR(request->SetUri(uri)); + TF_RETURN_IF_ERROR(request->AddAuthBearerHeader(auth_token)); + TF_RETURN_IF_ERROR( + request->SetResultBuffer(scratch.get(), kBufferSize, &response_piece)); + TF_RETURN_IF_ERROR(request->Send()); + std::stringstream response_stream; + response_stream << response_piece; + Json::Value root; + Json::Reader reader; + if (!reader.parse(response_stream.str(), root)) { + return errors::Internal("Couldn't parse JSON response from GCS."); } - const auto name = item.get("name", Json::Value::null); - if (name == Json::Value::null || !name.isString()) { + const auto items = root.get("items", Json::Value::null); + if (items == Json::Value::null) { + // Empty results. + return Status::OK(); + } + if (!items.isArray()) { + return errors::Internal("Expected an array 'items' in the GCS response."); + } + for (size_t i = 0; i < items.size(); i++) { + const auto item = items.get(i, Json::Value::null); + if (!item.isObject()) { + return errors::Internal( + "Unexpected JSON format: 'items' should be a list of objects."); + } + const auto name = item.get("name", Json::Value::null); + if (name == Json::Value::null || !name.isString()) { + return errors::Internal( + "Unexpected JSON format: 'items.name' is missing or not a string."); + } + // The names should be relative to the 'dirname'. That means the + // 'object_prefix', which is part of 'dirname', should be removed from the + // beginning of 'name'. + string name_str(name.asString()); + result->emplace_back(name_str.begin() + object_prefix.size(), + name_str.end()); + } + const auto token = root.get("nextPageToken", Json::Value::null); + if (token == Json::Value::null) { + return Status::OK(); + } + if (!token.isString()) { return errors::Internal( - "Unexpected JSON format: 'items.name' is missing or not a string."); + "Unexpected response: nextPageToken is not a string"); } - // The names should be relative to the 'dirname'. That means the - // 'object_prefix', which is part of 'dirname', should be removed from the - // beginning of 'name'. - string name_str(name.asString()); - result->emplace_back(name_str.begin() + object_prefix.size(), - name_str.end()); + nextPageToken = token.asString(); } - return Status::OK(); } Status GcsFileSystem::DeleteFile(const string& fname) { @@ -466,12 +486,8 @@ Status GcsFileSystem::CreateDir(const string& dirname) { return Status::OK(); } // Checks that the directory is empty (i.e no objects with this prefix exist). // If it is, does nothing, because directories are not entities in GCS. Status GcsFileSystem::DeleteDir(const string& dirname) { - string sanitized_dirname = dirname; - if (!dirname.empty() && dirname.back() != '/') { - sanitized_dirname += "/"; - } std::vector<string> children; - TF_RETURN_IF_ERROR(GetChildren(sanitized_dirname, &children)); + TF_RETURN_IF_ERROR(GetChildren(dirname, &children)); if (!children.empty()) { return errors::InvalidArgument("Cannot delete a non-empty directory."); } diff --git a/tensorflow/core/platform/cloud/gcs_file_system_test.cc b/tensorflow/core/platform/cloud/gcs_file_system_test.cc index 9c195ba144..b43a5778ee 100644 --- a/tensorflow/core/platform/cloud/gcs_file_system_test.cc +++ b/tensorflow/core/platform/cloud/gcs_file_system_test.cc @@ -25,7 +25,7 @@ namespace { std::vector<HttpRequest*> CreateGetThreeChildrenRequest() { std::vector<HttpRequest*> requests({new FakeHttpRequest( "Uri: https://www.googleapis.com/storage/v1/b/bucket/o?" - "prefix=path%2F&fields=items\n" + "fields=items%2Fname%2CnextPageToken&prefix=path%2F\n" "Auth Token: fake_token\n", "{\"items\": [ " " { \"name\": \"path/file1.txt\" }," @@ -236,6 +236,25 @@ TEST(GcsFileSystemTest, FileExists) { EXPECT_FALSE(fs.FileExists("gs://bucket/path/file2.txt")); } +TEST(GcsFileSystemTest, FileExists_BucketOnly) { + std::vector<HttpRequest*> requests( + {new FakeHttpRequest( + "Uri: https://www.googleapis.com/storage/v1/b/bucket1\n" + "Auth Token: fake_token\n", + "{\"size\": \"100\"}"), + new FakeHttpRequest( + "Uri: https://www.googleapis.com/storage/v1/b/bucket2\n" + "Auth Token: fake_token\n", + "", errors::NotFound("404"))}); + GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), + std::unique_ptr<HttpRequest::Factory>( + new FakeHttpRequestFactory(&requests)), + 0 /* read ahead bytes */); + + EXPECT_TRUE(fs.FileExists("gs://bucket1")); + EXPECT_FALSE(fs.FileExists("gs://bucket2/")); +} + TEST(GcsFileSystemTest, GetChildren_ThreeFiles) { auto requests = CreateGetThreeChildrenRequest(); GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), @@ -262,10 +281,27 @@ TEST(GcsFileSystemTest, GetChildren_ThreeFiles_NoSlash) { ExpectGetThreeChildrenFiles(children); } +TEST(GcsFileSystemTest, GetChildren_Root) { + std::vector<HttpRequest*> requests({new FakeHttpRequest( + "Uri: https://www.googleapis.com/storage/v1/b/bucket-a-b-c/o?" + "fields=items%2Fname%2CnextPageToken\n" + "Auth Token: fake_token\n", + "{}")}); + GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), + std::unique_ptr<HttpRequest::Factory>( + new FakeHttpRequestFactory(&requests)), + 0 /* read ahead bytes */); + + std::vector<string> children; + TF_EXPECT_OK(fs.GetChildren("gs://bucket-a-b-c", &children)); + + EXPECT_EQ(0, children.size()); +} + TEST(GcsFileSystemTest, GetChildren_Empty) { std::vector<HttpRequest*> requests({new FakeHttpRequest( "Uri: https://www.googleapis.com/storage/v1/b/bucket/o?" - "prefix=path%2F&fields=items\n" + "fields=items%2Fname%2CnextPageToken&prefix=path%2F\n" "Auth Token: fake_token\n", "{}")}); GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), @@ -279,6 +315,42 @@ TEST(GcsFileSystemTest, GetChildren_Empty) { EXPECT_EQ(0, children.size()); } +TEST(GcsFileSystemTest, GetChildren_Pagination) { + std::vector<HttpRequest*> requests( + {new FakeHttpRequest( + "Uri: https://www.googleapis.com/storage/v1/b/bucket/o?" + "fields=items%2Fname%2CnextPageToken&prefix=path%2F\n" + "Auth Token: fake_token\n", + "{\"nextPageToken\": \"ABCD==\", " + " \"items\": [ " + " { \"name\": \"path/file1.txt\" }," + " { \"name\": \"path/subpath/file2.txt\" }," + " { \"name\": \"path/file3.txt\" }]}"), + new FakeHttpRequest( + "Uri: https://www.googleapis.com/storage/v1/b/bucket/o?" + "fields=items%2Fname%2CnextPageToken&prefix=path%2F" + "&pageToken=ABCD==\n" + "Auth Token: fake_token\n", + "{\"items\": [ " + " { \"name\": \"path/file4.txt\" }," + " { \"name\": \"path/file5.txt\" }]}")}); + + GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), + std::unique_ptr<HttpRequest::Factory>( + new FakeHttpRequestFactory(&requests)), + 0 /* read ahead bytes */); + + std::vector<string> children; + TF_EXPECT_OK(fs.GetChildren("gs://bucket/path", &children)); + + EXPECT_EQ(5, children.size()); + EXPECT_EQ("file1.txt", children[0]); + EXPECT_EQ("subpath/file2.txt", children[1]); + EXPECT_EQ("file3.txt", children[2]); + EXPECT_EQ("file4.txt", children[3]); + EXPECT_EQ("file5.txt", children[4]); +} + TEST(GcsFileSystemTest, DeleteFile) { std::vector<HttpRequest*> requests( {new FakeHttpRequest("Uri: https://www.googleapis.com/storage/v1/b" @@ -297,7 +369,7 @@ TEST(GcsFileSystemTest, DeleteFile) { TEST(GcsFileSystemTest, DeleteDir_Empty) { std::vector<HttpRequest*> requests({new FakeHttpRequest( "Uri: https://www.googleapis.com/storage/v1/b/bucket/o?" - "prefix=path%2F&fields=items\n" + "fields=items%2Fname%2CnextPageToken&prefix=path%2F\n" "Auth Token: fake_token\n", "{}")}); GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), @@ -311,7 +383,7 @@ TEST(GcsFileSystemTest, DeleteDir_Empty) { TEST(GcsFileSystemTest, DeleteDir_NonEmpty) { std::vector<HttpRequest*> requests({new FakeHttpRequest( "Uri: https://www.googleapis.com/storage/v1/b/bucket/o?" - "prefix=path%2F&fields=items\n" + "fields=items%2Fname%2CnextPageToken&prefix=path%2F\n" "Auth Token: fake_token\n", "{\"items\": [ " " { \"name\": \"path/file1.txt\" }]}")}); |