aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--tensorflow/core/platform/cloud/gcs_file_system.cc10
-rw-r--r--tensorflow/core/platform/cloud/gcs_file_system_test.cc47
-rw-r--r--tensorflow/core/platform/cloud/retrying_file_system.cc7
-rw-r--r--tensorflow/core/platform/cloud/retrying_utils.cc17
-rw-r--r--tensorflow/core/platform/cloud/retrying_utils.h7
-rw-r--r--tensorflow/core/platform/cloud/retrying_utils_test.cc73
6 files changed, 151 insertions, 10 deletions
diff --git a/tensorflow/core/platform/cloud/gcs_file_system.cc b/tensorflow/core/platform/cloud/gcs_file_system.cc
index fdd3dcf389..6022caa3d1 100644
--- a/tensorflow/core/platform/cloud/gcs_file_system.cc
+++ b/tensorflow/core/platform/cloud/gcs_file_system.cc
@@ -1015,7 +1015,7 @@ Status GcsFileSystem::CreateDir(const string& dirname) {
}
// Checks that the directory is empty (i.e no objects with this prefix exist).
-// If it is, does nothing, because directories are not entities in GCS.
+// Deletes the GCS directory marker if it exists.
Status GcsFileSystem::DeleteDir(const string& dirname) {
std::vector<string> children;
// A directory is considered empty either if there are no matching objects
@@ -1107,8 +1107,12 @@ Status GcsFileSystem::RenameObject(const string& src, const string& target) {
"locations or storage classes is not supported.");
}
- TF_RETURN_IF_ERROR(DeleteFile(src));
- return Status::OK();
+ // In case the delete API call failed, but the deletion actually happened
+ // on the server side, we can't just retry the whole RenameFile operation
+ // because the source object is already gone.
+ return RetryingUtils::DeleteWithRetries(
+ std::bind(&GcsFileSystem::DeleteFile, this, src),
+ initial_retry_delay_usec_);
}
Status GcsFileSystem::IsDirectory(const string& fname) {
diff --git a/tensorflow/core/platform/cloud/gcs_file_system_test.cc b/tensorflow/core/platform/cloud/gcs_file_system_test.cc
index 3d6b33f704..7fb70acf11 100644
--- a/tensorflow/core/platform/cloud/gcs_file_system_test.cc
+++ b/tensorflow/core/platform/cloud/gcs_file_system_test.cc
@@ -1113,6 +1113,53 @@ TEST(GcsFileSystemTest, RenameFile_Object) {
fs.RenameFile("gs://bucket/path/src.txt", "gs://bucket/path/dst.txt"));
}
+/// Tests the scenario when deletion returns a failure, but actually succeeds.
+TEST(GcsFileSystemTest, RenameFile_Object_DeletionRetried) {
+ std::vector<HttpRequest*> requests(
+ {// IsDirectory is checking whether there are children objects.
+ new FakeHttpRequest(
+ "Uri: https://www.googleapis.com/storage/v1/b/bucket/o?"
+ "fields=items%2Fname%2CnextPageToken&prefix=path%2Fsrc.txt%2F"
+ "&maxResults=1\n"
+ "Auth Token: fake_token\n",
+ "{}"),
+ // IsDirectory is checking if the path exists as an object.
+ new FakeHttpRequest(
+ "Uri: https://www.googleapis.com/storage/v1/b/bucket/o/"
+ "path%2Fsrc.txt?fields=size%2Cupdated\n"
+ "Auth Token: fake_token\n",
+ strings::StrCat("{\"size\": \"1010\","
+ "\"updated\": \"2016-04-29T23:15:24.896Z\"}")),
+ // Copying to the new location.
+ new FakeHttpRequest(
+ "Uri: https://www.googleapis.com/storage/v1/b/bucket/o/"
+ "path%2Fsrc.txt/rewriteTo/b/bucket/o/path%2Fdst.txt\n"
+ "Auth Token: fake_token\n"
+ "Post: yes\n",
+ "{\"done\": true}"),
+ // Deleting the original file - the deletion returns a failure.
+ new FakeHttpRequest(
+ "Uri: https://www.googleapis.com/storage/v1/b/bucket/o/"
+ "path%2Fsrc.txt\n"
+ "Auth Token: fake_token\n"
+ "Delete: yes\n",
+ "", errors::Unavailable("503"), 503),
+ // Deleting the original file again - the deletion returns NOT_FOUND.
+ new FakeHttpRequest(
+ "Uri: https://www.googleapis.com/storage/v1/b/bucket/o/"
+ "path%2Fsrc.txt\n"
+ "Auth Token: fake_token\n"
+ "Delete: yes\n",
+ "", errors::NotFound("404"), 404)});
+ GcsFileSystem fs(std::unique_ptr<AuthProvider>(new FakeAuthProvider),
+ std::unique_ptr<HttpRequest::Factory>(
+ new FakeHttpRequestFactory(&requests)),
+ 0 /* read ahead bytes */, 0 /* initial retry delay */);
+
+ TF_EXPECT_OK(
+ fs.RenameFile("gs://bucket/path/src.txt", "gs://bucket/path/dst.txt"));
+}
+
/// Tests the case when rewrite couldn't complete in one RPC.
TEST(GcsFileSystemTest, RenameFile_Object_Incomplete) {
std::vector<HttpRequest*> requests(
diff --git a/tensorflow/core/platform/cloud/retrying_file_system.cc b/tensorflow/core/platform/cloud/retrying_file_system.cc
index 18ddbf01dd..1ff32f1d4e 100644
--- a/tensorflow/core/platform/cloud/retrying_file_system.cc
+++ b/tensorflow/core/platform/cloud/retrying_file_system.cc
@@ -134,7 +134,6 @@ Status RetryingFileSystem::FileExists(const string& fname) {
return RetryingUtils::CallWithRetries(
std::bind(&FileSystem::FileExists, base_file_system_.get(), fname),
initial_delay_microseconds_);
- ;
}
Status RetryingFileSystem::Stat(const string& fname, FileStatistics* stat) {
@@ -159,7 +158,7 @@ Status RetryingFileSystem::GetMatchingPaths(const string& pattern,
}
Status RetryingFileSystem::DeleteFile(const string& fname) {
- return RetryingUtils::CallWithRetries(
+ return RetryingUtils::DeleteWithRetries(
std::bind(&FileSystem::DeleteFile, base_file_system_.get(), fname),
initial_delay_microseconds_);
}
@@ -171,7 +170,7 @@ Status RetryingFileSystem::CreateDir(const string& dirname) {
}
Status RetryingFileSystem::DeleteDir(const string& dirname) {
- return RetryingUtils::CallWithRetries(
+ return RetryingUtils::DeleteWithRetries(
std::bind(&FileSystem::DeleteDir, base_file_system_.get(), dirname),
initial_delay_microseconds_);
}
@@ -198,7 +197,7 @@ Status RetryingFileSystem::IsDirectory(const string& dirname) {
Status RetryingFileSystem::DeleteRecursively(const string& dirname,
int64* undeleted_files,
int64* undeleted_dirs) {
- return RetryingUtils::CallWithRetries(
+ return RetryingUtils::DeleteWithRetries(
std::bind(&FileSystem::DeleteRecursively, base_file_system_.get(),
dirname, undeleted_files, undeleted_dirs),
initial_delay_microseconds_);
diff --git a/tensorflow/core/platform/cloud/retrying_utils.cc b/tensorflow/core/platform/cloud/retrying_utils.cc
index 705adbfdc7..096c77c6e3 100644
--- a/tensorflow/core/platform/cloud/retrying_utils.cc
+++ b/tensorflow/core/platform/cloud/retrying_utils.cc
@@ -82,4 +82,21 @@ Status RetryingUtils::CallWithRetries(
retries++;
}
}
+
+Status RetryingUtils::DeleteWithRetries(
+ const std::function<Status()>& delete_func,
+ const int64 initial_delay_microseconds) {
+ bool is_retried = false;
+ return RetryingUtils::CallWithRetries(
+ [delete_func, &is_retried]() {
+ const auto& status = delete_func();
+ if (is_retried && status.code() == error::NOT_FOUND) {
+ return Status::OK();
+ }
+ is_retried = true;
+ return status;
+ },
+ initial_delay_microseconds);
+}
+
} // namespace tensorflow
diff --git a/tensorflow/core/platform/cloud/retrying_utils.h b/tensorflow/core/platform/cloud/retrying_utils.h
index ca5c40dc93..99ab216e97 100644
--- a/tensorflow/core/platform/cloud/retrying_utils.h
+++ b/tensorflow/core/platform/cloud/retrying_utils.h
@@ -36,6 +36,13 @@ class RetryingUtils {
static Status CallWithRetries(const std::function<Status()>& f,
const int64 initial_delay_microseconds,
const std::function<void(int64)>& sleep_usec);
+ /// \brief A retrying wrapper for a function that deletes a resource.
+ ///
+ /// The function takes care of the scenario when a delete operation
+ /// returns a failure but succeeds under the hood: if a retry returns
+ /// NOT_FOUND, the whole operation is considered a success.
+ static Status DeleteWithRetries(const std::function<Status()>& delete_func,
+ const int64 initial_delay_microseconds);
};
} // namespace tensorflow
diff --git a/tensorflow/core/platform/cloud/retrying_utils_test.cc b/tensorflow/core/platform/cloud/retrying_utils_test.cc
index bdfe864fd7..6eb340e094 100644
--- a/tensorflow/core/platform/cloud/retrying_utils_test.cc
+++ b/tensorflow/core/platform/cloud/retrying_utils_test.cc
@@ -22,7 +22,7 @@ limitations under the License.
namespace tensorflow {
namespace {
-TEST(RetryingUtilsTest, RetryDelays) {
+TEST(RetryingUtilsTest, CallWithRetries_RetryDelays) {
std::vector<double> requested_delays; // requested delays in seconds
std::function<void(int64)> sleep = [&requested_delays](int64 delay) {
requested_delays.emplace_back(delay / 1000000.0);
@@ -52,7 +52,7 @@ TEST(RetryingUtilsTest, RetryDelays) {
EXPECT_NEAR(32.0, requested_delays[9], 1.0);
}
-TEST(RetryingUtilsTest, NotFoundIsNotRetried) {
+TEST(RetryingUtilsTest, CallWithRetries_NotFoundIsNotRetried) {
std::vector<Status> results(
{errors::Unavailable("Failed."), errors::NotFound("Not found.")});
std::function<Status()> f = [&results]() {
@@ -64,7 +64,20 @@ TEST(RetryingUtilsTest, NotFoundIsNotRetried) {
RetryingUtils::CallWithRetries(f, 0).code());
}
-TEST(RetryingUtilsTest, EventualSuccess) {
+TEST(RetryingUtilsTest, CallWithRetries_ImmediateSuccess) {
+ std::vector<Status> results({Status::OK()});
+ std::function<void(int64)> sleep = [](int64 delay) {
+ ADD_FAILURE() << "Unexpected call to sleep.";
+ };
+ std::function<Status()> f = [&results]() {
+ auto result = results[0];
+ results.erase(results.begin());
+ return result;
+ };
+ TF_EXPECT_OK(RetryingUtils::CallWithRetries(f, 1.0, sleep));
+}
+
+TEST(RetryingUtilsTest, CallWithRetries_EventualSuccess) {
std::vector<Status> results({errors::Unavailable("Failed."),
errors::Unavailable("Failed again."),
Status::OK()});
@@ -76,5 +89,59 @@ TEST(RetryingUtilsTest, EventualSuccess) {
TF_EXPECT_OK(RetryingUtils::CallWithRetries(f, 0));
}
+TEST(RetryingUtilsTest, DeleteWithRetries_ImmediateSuccess) {
+ std::vector<Status> delete_results({Status::OK()});
+ const auto delete_func = [&delete_results]() {
+ auto result = delete_results[0];
+ delete_results.erase(delete_results.begin());
+ return result;
+ };
+ TF_EXPECT_OK(RetryingUtils::DeleteWithRetries(delete_func, 0));
+}
+
+TEST(RetryingUtilsTest, DeleteWithRetries_EventualSuccess) {
+ std::vector<Status> delete_results({errors::Unavailable(""), Status::OK()});
+ const auto delete_func = [&delete_results]() {
+ auto result = delete_results[0];
+ delete_results.erase(delete_results.begin());
+ return result;
+ };
+ TF_EXPECT_OK(RetryingUtils::DeleteWithRetries(delete_func, 0));
+}
+
+TEST(RetryingUtilsTest, DeleteWithRetries_PermissionDeniedNotRetried) {
+ std::vector<Status> delete_results(
+ {errors::Unavailable(""), errors::PermissionDenied("")});
+ const auto delete_func = [&delete_results]() {
+ auto result = delete_results[0];
+ delete_results.erase(delete_results.begin());
+ return result;
+ };
+ EXPECT_EQ(errors::Code::PERMISSION_DENIED,
+ RetryingUtils::DeleteWithRetries(delete_func, 0).code());
+}
+
+TEST(RetryingUtilsTest, DeleteWithRetries_SuccessThroughFileNotFound) {
+ std::vector<Status> delete_results(
+ {errors::Unavailable(""), errors::NotFound("")});
+ const auto delete_func = [&delete_results]() {
+ auto result = delete_results[0];
+ delete_results.erase(delete_results.begin());
+ return result;
+ };
+ TF_EXPECT_OK(RetryingUtils::DeleteWithRetries(delete_func, 0));
+}
+
+TEST(RetryingUtilsTest, DeleteWithRetries_FirstNotFoundReturnedAsIs) {
+ std::vector<Status> delete_results({errors::NotFound("")});
+ const auto delete_func = [&delete_results]() {
+ auto result = delete_results[0];
+ delete_results.erase(delete_results.begin());
+ return result;
+ };
+ EXPECT_EQ(error::NOT_FOUND,
+ RetryingUtils::DeleteWithRetries(delete_func, 0).code());
+}
+
} // namespace
} // namespace tensorflow