diff options
6 files changed, 151 insertions, 10 deletions
diff --git a/tensorflow/core/platform/cloud/gcs_file_system.cc b/tensorflow/core/platform/cloud/gcs_file_system.cc index fdd3dcf389..6022caa3d1 100644 --- a/tensorflow/core/platform/cloud/gcs_file_system.cc +++ b/tensorflow/core/platform/cloud/gcs_file_system.cc @@ -1015,7 +1015,7 @@ Status GcsFileSystem::CreateDir(const string& dirname) { } // Checks that the directory is empty (i.e no objects with this prefix exist). -// If it is, does nothing, because directories are not entities in GCS. +// Deletes the GCS directory marker if it exists. Status GcsFileSystem::DeleteDir(const string& dirname) { std::vector<string> children; // A directory is considered empty either if there are no matching objects @@ -1107,8 +1107,12 @@ Status GcsFileSystem::RenameObject(const string& src, const string& target) { "locations or storage classes is not supported."); } - TF_RETURN_IF_ERROR(DeleteFile(src)); - return Status::OK(); + // In case the delete API call failed, but the deletion actually happened + // on the server side, we can't just retry the whole RenameFile operation + // because the source object is already gone. + return RetryingUtils::DeleteWithRetries( + std::bind(&GcsFileSystem::DeleteFile, this, src), + initial_retry_delay_usec_); } Status GcsFileSystem::IsDirectory(const string& fname) { diff --git a/tensorflow/core/platform/cloud/gcs_file_system_test.cc b/tensorflow/core/platform/cloud/gcs_file_system_test.cc index 3d6b33f704..7fb70acf11 100644 --- a/tensorflow/core/platform/cloud/gcs_file_system_test.cc +++ b/tensorflow/core/platform/cloud/gcs_file_system_test.cc @@ -1113,6 +1113,53 @@ TEST(GcsFileSystemTest, RenameFile_Object) { fs.RenameFile("gs://bucket/path/src.txt", "gs://bucket/path/dst.txt")); } +/// Tests the scenario when deletion returns a failure, but actually succeeds. +TEST(GcsFileSystemTest, RenameFile_Object_DeletionRetried) { + std::vector<HttpRequest*> requests( + {// IsDirectory is checking whether there are children objects. + new FakeHttpRequest( + "Uri: https://www.googleapis.com/storage/v1/b/bucket/o?" + "fields=items%2Fname%2CnextPageToken&prefix=path%2Fsrc.txt%2F" + "&maxResults=1\n" + "Auth Token: fake_token\n", + "{}"), + // IsDirectory is checking if the path exists as an object. + new FakeHttpRequest( + "Uri: https://www.googleapis.com/storage/v1/b/bucket/o/" + "path%2Fsrc.txt?fields=size%2Cupdated\n" + "Auth Token: fake_token\n", + strings::StrCat("{\"size\": \"1010\"," + "\"updated\": \"2016-04-29T23:15:24.896Z\"}")), + // Copying to the new location. + new FakeHttpRequest( + "Uri: https://www.googleapis.com/storage/v1/b/bucket/o/" + "path%2Fsrc.txt/rewriteTo/b/bucket/o/path%2Fdst.txt\n" + "Auth Token: fake_token\n" + "Post: yes\n", + "{\"done\": true}"), + // Deleting the original file - the deletion returns a failure. + new FakeHttpRequest( + "Uri: https://www.googleapis.com/storage/v1/b/bucket/o/" + "path%2Fsrc.txt\n" + "Auth Token: fake_token\n" + "Delete: yes\n", + "", errors::Unavailable("503"), 503), + // Deleting the original file again - the deletion returns NOT_FOUND. + new FakeHttpRequest( + "Uri: https://www.googleapis.com/storage/v1/b/bucket/o/" + "path%2Fsrc.txt\n" + "Auth Token: fake_token\n" + "Delete: yes\n", + "", errors::NotFound("404"), 404)}); + GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider), + std::unique_ptr<HttpRequest::Factory>( + new FakeHttpRequestFactory(&requests)), + 0 /* read ahead bytes */, 0 /* initial retry delay */); + + TF_EXPECT_OK( + fs.RenameFile("gs://bucket/path/src.txt", "gs://bucket/path/dst.txt")); +} + /// Tests the case when rewrite couldn't complete in one RPC. TEST(GcsFileSystemTest, RenameFile_Object_Incomplete) { std::vector<HttpRequest*> requests( diff --git a/tensorflow/core/platform/cloud/retrying_file_system.cc b/tensorflow/core/platform/cloud/retrying_file_system.cc index 18ddbf01dd..1ff32f1d4e 100644 --- a/tensorflow/core/platform/cloud/retrying_file_system.cc +++ b/tensorflow/core/platform/cloud/retrying_file_system.cc @@ -134,7 +134,6 @@ Status RetryingFileSystem::FileExists(const string& fname) { return RetryingUtils::CallWithRetries( std::bind(&FileSystem::FileExists, base_file_system_.get(), fname), initial_delay_microseconds_); - ; } Status RetryingFileSystem::Stat(const string& fname, FileStatistics* stat) { @@ -159,7 +158,7 @@ Status RetryingFileSystem::GetMatchingPaths(const string& pattern, } Status RetryingFileSystem::DeleteFile(const string& fname) { - return RetryingUtils::CallWithRetries( + return RetryingUtils::DeleteWithRetries( std::bind(&FileSystem::DeleteFile, base_file_system_.get(), fname), initial_delay_microseconds_); } @@ -171,7 +170,7 @@ Status RetryingFileSystem::CreateDir(const string& dirname) { } Status RetryingFileSystem::DeleteDir(const string& dirname) { - return RetryingUtils::CallWithRetries( + return RetryingUtils::DeleteWithRetries( std::bind(&FileSystem::DeleteDir, base_file_system_.get(), dirname), initial_delay_microseconds_); } @@ -198,7 +197,7 @@ Status RetryingFileSystem::IsDirectory(const string& dirname) { Status RetryingFileSystem::DeleteRecursively(const string& dirname, int64* undeleted_files, int64* undeleted_dirs) { - return RetryingUtils::CallWithRetries( + return RetryingUtils::DeleteWithRetries( std::bind(&FileSystem::DeleteRecursively, base_file_system_.get(), dirname, undeleted_files, undeleted_dirs), initial_delay_microseconds_); diff --git a/tensorflow/core/platform/cloud/retrying_utils.cc b/tensorflow/core/platform/cloud/retrying_utils.cc index 705adbfdc7..096c77c6e3 100644 --- a/tensorflow/core/platform/cloud/retrying_utils.cc +++ b/tensorflow/core/platform/cloud/retrying_utils.cc @@ -82,4 +82,21 @@ Status RetryingUtils::CallWithRetries( retries++; } } + +Status RetryingUtils::DeleteWithRetries( + const std::function<Status()>& delete_func, + const int64 initial_delay_microseconds) { + bool is_retried = false; + return RetryingUtils::CallWithRetries( + [delete_func, &is_retried]() { + const auto& status = delete_func(); + if (is_retried && status.code() == error::NOT_FOUND) { + return Status::OK(); + } + is_retried = true; + return status; + }, + initial_delay_microseconds); +} + } // namespace tensorflow diff --git a/tensorflow/core/platform/cloud/retrying_utils.h b/tensorflow/core/platform/cloud/retrying_utils.h index ca5c40dc93..99ab216e97 100644 --- a/tensorflow/core/platform/cloud/retrying_utils.h +++ b/tensorflow/core/platform/cloud/retrying_utils.h @@ -36,6 +36,13 @@ class RetryingUtils { static Status CallWithRetries(const std::function<Status()>& f, const int64 initial_delay_microseconds, const std::function<void(int64)>& sleep_usec); + /// \brief A retrying wrapper for a function that deletes a resource. + /// + /// The function takes care of the scenario when a delete operation + /// returns a failure but succeeds under the hood: if a retry returns + /// NOT_FOUND, the whole operation is considered a success. + static Status DeleteWithRetries(const std::function<Status()>& delete_func, + const int64 initial_delay_microseconds); }; } // namespace tensorflow diff --git a/tensorflow/core/platform/cloud/retrying_utils_test.cc b/tensorflow/core/platform/cloud/retrying_utils_test.cc index bdfe864fd7..6eb340e094 100644 --- a/tensorflow/core/platform/cloud/retrying_utils_test.cc +++ b/tensorflow/core/platform/cloud/retrying_utils_test.cc @@ -22,7 +22,7 @@ limitations under the License. namespace tensorflow { namespace { -TEST(RetryingUtilsTest, RetryDelays) { +TEST(RetryingUtilsTest, CallWithRetries_RetryDelays) { std::vector<double> requested_delays; // requested delays in seconds std::function<void(int64)> sleep = [&requested_delays](int64 delay) { requested_delays.emplace_back(delay / 1000000.0); @@ -52,7 +52,7 @@ TEST(RetryingUtilsTest, RetryDelays) { EXPECT_NEAR(32.0, requested_delays[9], 1.0); } -TEST(RetryingUtilsTest, NotFoundIsNotRetried) { +TEST(RetryingUtilsTest, CallWithRetries_NotFoundIsNotRetried) { std::vector<Status> results( {errors::Unavailable("Failed."), errors::NotFound("Not found.")}); std::function<Status()> f = [&results]() { @@ -64,7 +64,20 @@ TEST(RetryingUtilsTest, NotFoundIsNotRetried) { RetryingUtils::CallWithRetries(f, 0).code()); } -TEST(RetryingUtilsTest, EventualSuccess) { +TEST(RetryingUtilsTest, CallWithRetries_ImmediateSuccess) { + std::vector<Status> results({Status::OK()}); + std::function<void(int64)> sleep = [](int64 delay) { + ADD_FAILURE() << "Unexpected call to sleep."; + }; + std::function<Status()> f = [&results]() { + auto result = results[0]; + results.erase(results.begin()); + return result; + }; + TF_EXPECT_OK(RetryingUtils::CallWithRetries(f, 1.0, sleep)); +} + +TEST(RetryingUtilsTest, CallWithRetries_EventualSuccess) { std::vector<Status> results({errors::Unavailable("Failed."), errors::Unavailable("Failed again."), Status::OK()}); @@ -76,5 +89,59 @@ TEST(RetryingUtilsTest, EventualSuccess) { TF_EXPECT_OK(RetryingUtils::CallWithRetries(f, 0)); } +TEST(RetryingUtilsTest, DeleteWithRetries_ImmediateSuccess) { + std::vector<Status> delete_results({Status::OK()}); + const auto delete_func = [&delete_results]() { + auto result = delete_results[0]; + delete_results.erase(delete_results.begin()); + return result; + }; + TF_EXPECT_OK(RetryingUtils::DeleteWithRetries(delete_func, 0)); +} + +TEST(RetryingUtilsTest, DeleteWithRetries_EventualSuccess) { + std::vector<Status> delete_results({errors::Unavailable(""), Status::OK()}); + const auto delete_func = [&delete_results]() { + auto result = delete_results[0]; + delete_results.erase(delete_results.begin()); + return result; + }; + TF_EXPECT_OK(RetryingUtils::DeleteWithRetries(delete_func, 0)); +} + +TEST(RetryingUtilsTest, DeleteWithRetries_PermissionDeniedNotRetried) { + std::vector<Status> delete_results( + {errors::Unavailable(""), errors::PermissionDenied("")}); + const auto delete_func = [&delete_results]() { + auto result = delete_results[0]; + delete_results.erase(delete_results.begin()); + return result; + }; + EXPECT_EQ(errors::Code::PERMISSION_DENIED, + RetryingUtils::DeleteWithRetries(delete_func, 0).code()); +} + +TEST(RetryingUtilsTest, DeleteWithRetries_SuccessThroughFileNotFound) { + std::vector<Status> delete_results( + {errors::Unavailable(""), errors::NotFound("")}); + const auto delete_func = [&delete_results]() { + auto result = delete_results[0]; + delete_results.erase(delete_results.begin()); + return result; + }; + TF_EXPECT_OK(RetryingUtils::DeleteWithRetries(delete_func, 0)); +} + +TEST(RetryingUtilsTest, DeleteWithRetries_FirstNotFoundReturnedAsIs) { + std::vector<Status> delete_results({errors::NotFound("")}); + const auto delete_func = [&delete_results]() { + auto result = delete_results[0]; + delete_results.erase(delete_results.begin()); + return result; + }; + EXPECT_EQ(error::NOT_FOUND, + RetryingUtils::DeleteWithRetries(delete_func, 0).code()); +} + } // namespace } // namespace tensorflow |