From 5eef3a21a0df996c407a78cdfbdcdd11ce4f6f34 Mon Sep 17 00:00:00 2001 From: "A. Unique TensorFlower" Date: Fri, 30 Mar 2018 11:32:58 -0700 Subject: Break FileSystem's dependency on ThreadPool. PiperOrigin-RevId: 191092932 --- .../contrib/android/asset_manager_filesystem.cc | 6 + .../contrib/android/asset_manager_filesystem.h | 3 + .../contrib/makefile/proto_text_cc_files.txt | 1 + tensorflow/core/BUILD | 1 + tensorflow/core/platform/file_system.cc | 93 --------------- tensorflow/core/platform/file_system.h | 4 +- tensorflow/core/platform/file_system_helper.cc | 126 +++++++++++++++++++++ tensorflow/core/platform/file_system_helper.h | 51 +++++++++ .../core/platform/hadoop/hadoop_file_system.cc | 6 + .../core/platform/hadoop/hadoop_file_system.h | 3 + tensorflow/core/platform/null_file_system.h | 6 + .../core/platform/posix/posix_file_system.cc | 6 + tensorflow/core/platform/posix/posix_file_system.h | 3 + tensorflow/core/platform/s3/s3_file_system.cc | 6 + tensorflow/core/platform/s3/s3_file_system.h | 3 + .../core/platform/windows/windows_file_system.cc | 4 +- tensorflow/core/util/memmapped_file_system.cc | 6 + tensorflow/core/util/memmapped_file_system.h | 2 + 18 files changed, 233 insertions(+), 97 deletions(-) create mode 100644 tensorflow/core/platform/file_system_helper.cc create mode 100644 tensorflow/core/platform/file_system_helper.h (limited to 'tensorflow') diff --git a/tensorflow/contrib/android/asset_manager_filesystem.cc b/tensorflow/contrib/android/asset_manager_filesystem.cc index 380a652435..fe2d13e636 100644 --- a/tensorflow/contrib/android/asset_manager_filesystem.cc +++ b/tensorflow/contrib/android/asset_manager_filesystem.cc @@ -19,6 +19,7 @@ limitations under the License. #include "tensorflow/core/lib/strings/str_util.h" #include "tensorflow/core/platform/env.h" +#include "tensorflow/core/platform/file_system_helper.h" namespace tensorflow { namespace { @@ -243,6 +244,11 @@ bool AssetManagerFileSystem::DirectoryExists(const std::string& fname) { return AAssetDir_getNextFileName(dir.get()) != NULL; } +Status AssetManagerFileSystem::GetMatchingPaths(const string& pattern, + std::vector* results) { + return internal::GetMatchingPaths(this, Env::Default(), pattern, results); +} + Status AssetManagerFileSystem::NewWritableFile( const string& fname, std::unique_ptr* result) { return errors::Unimplemented("Asset storage is read only."); diff --git a/tensorflow/contrib/android/asset_manager_filesystem.h b/tensorflow/contrib/android/asset_manager_filesystem.h index 665304b5ee..a87ff42ae2 100644 --- a/tensorflow/contrib/android/asset_manager_filesystem.h +++ b/tensorflow/contrib/android/asset_manager_filesystem.h @@ -66,6 +66,9 @@ class AssetManagerFileSystem : public FileSystem { Status DeleteDir(const string& d) override; Status RenameFile(const string& s, const string& t) override; + Status GetMatchingPaths(const string& pattern, + std::vector* results) override; + private: string RemoveAssetPrefix(const string& name); diff --git a/tensorflow/contrib/makefile/proto_text_cc_files.txt b/tensorflow/contrib/makefile/proto_text_cc_files.txt index 77c936d8c5..76428bc1d4 100644 --- a/tensorflow/contrib/makefile/proto_text_cc_files.txt +++ b/tensorflow/contrib/makefile/proto_text_cc_files.txt @@ -12,6 +12,7 @@ tensorflow/core/platform/posix/env.cc tensorflow/core/platform/posix/load_library.cc tensorflow/core/platform/posix/env_time.cc tensorflow/core/platform/file_system.cc +tensorflow/core/platform/file_system_helper.cc tensorflow/core/platform/env.cc tensorflow/core/platform/env_time.cc tensorflow/core/platform/setround.cc diff --git a/tensorflow/core/BUILD b/tensorflow/core/BUILD index 21f7866abd..7d5ae1c5b5 100644 --- a/tensorflow/core/BUILD +++ b/tensorflow/core/BUILD @@ -349,6 +349,7 @@ cc_library( "platform/env.h", "platform/env_time.h", "platform/file_system.h", + "platform/file_system_helper.h", "platform/fingerprint.h", "platform/init_main.h", "platform/logging.h", diff --git a/tensorflow/core/platform/file_system.cc b/tensorflow/core/platform/file_system.cc index a2f42f44ac..b55e94d552 100644 --- a/tensorflow/core/platform/file_system.cc +++ b/tensorflow/core/platform/file_system.cc @@ -18,7 +18,6 @@ limitations under the License. #include #include "tensorflow/core/lib/core/errors.h" -#include "tensorflow/core/lib/core/threadpool.h" #include "tensorflow/core/lib/io/path.h" #include "tensorflow/core/lib/strings/str_util.h" #include "tensorflow/core/lib/strings/strcat.h" @@ -28,28 +27,6 @@ limitations under the License. namespace tensorflow { -namespace { - -constexpr int kNumThreads = 8; - -// Run a function in parallel using a ThreadPool, but skip the ThreadPool -// on the iOS platform due to its problems with more than a few threads. -void ForEach(int first, int last, const std::function& f) { -#if TARGET_OS_IPHONE - for (int i = first; i < last; i++) { - f(i); - } -#else - int num_threads = std::min(kNumThreads, last - first); - thread::ThreadPool threads(Env::Default(), "ForEach", num_threads); - for (int i = first; i < last; i++) { - threads.Schedule([f, i] { f(i); }); - } -#endif -} - -} // anonymous namespace - FileSystem::~FileSystem() {} string FileSystem::TranslateName(const string& name) const { @@ -94,76 +71,6 @@ bool FileSystem::FilesExist(const std::vector& files, return result; } -Status FileSystem::GetMatchingPaths(const string& pattern, - std::vector* results) { - results->clear(); - // Find the fixed prefix by looking for the first wildcard. - string fixed_prefix = pattern.substr(0, pattern.find_first_of("*?[\\")); - string eval_pattern = pattern; - std::vector all_files; - string dir = io::Dirname(fixed_prefix).ToString(); - // If dir is empty then we need to fix up fixed_prefix and eval_pattern to - // include . as the top level directory. - if (dir.empty()) { - dir = "."; - fixed_prefix = io::JoinPath(dir, fixed_prefix); - eval_pattern = io::JoinPath(dir, pattern); - } - - // Setup a BFS to explore everything under dir. - std::deque dir_q; - dir_q.push_back(dir); - Status ret; // Status to return. - // children_dir_status holds is_dir status for children. It can have three - // possible values: OK for true; FAILED_PRECONDITION for false; CANCELLED - // if we don't calculate IsDirectory (we might do that because there isn't - // any point in exploring that child path). - std::vector children_dir_status; - while (!dir_q.empty()) { - string current_dir = dir_q.front(); - dir_q.pop_front(); - std::vector children; - Status s = GetChildren(current_dir, &children); - ret.Update(s); - if (children.empty()) continue; - // This IsDirectory call can be expensive for some FS. Parallelizing it. - children_dir_status.resize(children.size()); - ForEach(0, children.size(), - [this, ¤t_dir, &children, &fixed_prefix, - &children_dir_status](int i) { - const string child_path = io::JoinPath(current_dir, children[i]); - // In case the child_path doesn't start with the fixed_prefix then - // we don't need to explore this path. - if (!str_util::StartsWith(child_path, fixed_prefix)) { - children_dir_status[i] = Status(tensorflow::error::CANCELLED, - "Operation not needed"); - } else { - children_dir_status[i] = IsDirectory(child_path); - } - }); - for (int i = 0; i < children.size(); ++i) { - const string child_path = io::JoinPath(current_dir, children[i]); - // If the IsDirectory call was cancelled we bail. - if (children_dir_status[i].code() == tensorflow::error::CANCELLED) { - continue; - } - // If the child is a directory add it to the queue. - if (children_dir_status[i].ok()) { - dir_q.push_back(child_path); - } - all_files.push_back(child_path); - } - } - - // Match all obtained files to the input pattern. - for (const auto& f : all_files) { - if (Env::Default()->MatchPath(f, eval_pattern)) { - results->push_back(f); - } - } - return ret; -} - Status FileSystem::DeleteRecursively(const string& dirname, int64* undeleted_files, int64* undeleted_dirs) { diff --git a/tensorflow/core/platform/file_system.h b/tensorflow/core/platform/file_system.h index 8f99766e15..077b1d79cf 100644 --- a/tensorflow/core/platform/file_system.h +++ b/tensorflow/core/platform/file_system.h @@ -138,10 +138,8 @@ class FileSystem { /// * OK - no errors /// * UNIMPLEMENTED - Some underlying functions (like GetChildren) are not /// implemented - /// The default implementation uses a combination of GetChildren, MatchPath - /// and IsDirectory. virtual Status GetMatchingPaths(const string& pattern, - std::vector* results); + std::vector* results) = 0; /// \brief Obtains statistics for the given path. virtual Status Stat(const string& fname, FileStatistics* stat) = 0; diff --git a/tensorflow/core/platform/file_system_helper.cc b/tensorflow/core/platform/file_system_helper.cc new file mode 100644 index 0000000000..22c5057281 --- /dev/null +++ b/tensorflow/core/platform/file_system_helper.cc @@ -0,0 +1,126 @@ +/* Copyright 2018 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include "tensorflow/core/platform/file_system_helper.h" + +#include +#include +#include + +#include "tensorflow/core/lib/core/status.h" +#include "tensorflow/core/lib/core/threadpool.h" +#include "tensorflow/core/lib/io/path.h" +#include "tensorflow/core/lib/strings/str_util.h" +#include "tensorflow/core/platform/env.h" +#include "tensorflow/core/platform/file_system.h" +#include "tensorflow/core/platform/platform.h" + +namespace tensorflow { +namespace internal { + +namespace { + +constexpr int kNumThreads = 8; + +// Run a function in parallel using a ThreadPool, but skip the ThreadPool +// on the iOS platform due to its problems with more than a few threads. +void ForEach(int first, int last, const std::function& f) { +#if TARGET_OS_IPHONE + for (int i = first; i < last; i++) { + f(i); + } +#else + int num_threads = std::min(kNumThreads, last - first); + thread::ThreadPool threads(Env::Default(), "ForEach", num_threads); + for (int i = first; i < last; i++) { + threads.Schedule([f, i] { f(i); }); + } +#endif +} + +} // namespace + +Status GetMatchingPaths(FileSystem* fs, Env* env, const string& pattern, + std::vector* results) { + results->clear(); + // Find the fixed prefix by looking for the first wildcard. + string fixed_prefix = pattern.substr(0, pattern.find_first_of("*?[\\")); + string eval_pattern = pattern; + std::vector all_files; + string dir = io::Dirname(fixed_prefix).ToString(); + // If dir is empty then we need to fix up fixed_prefix and eval_pattern to + // include . as the top level directory. + if (dir.empty()) { + dir = "."; + fixed_prefix = io::JoinPath(dir, fixed_prefix); + eval_pattern = io::JoinPath(dir, pattern); + } + + // Setup a BFS to explore everything under dir. + std::deque dir_q; + dir_q.push_back(dir); + Status ret; // Status to return. + // children_dir_status holds is_dir status for children. It can have three + // possible values: OK for true; FAILED_PRECONDITION for false; CANCELLED + // if we don't calculate IsDirectory (we might do that because there isn't + // any point in exploring that child path). + std::vector children_dir_status; + while (!dir_q.empty()) { + string current_dir = dir_q.front(); + dir_q.pop_front(); + std::vector children; + Status s = fs->GetChildren(current_dir, &children); + ret.Update(s); + if (children.empty()) continue; + // This IsDirectory call can be expensive for some FS. Parallelizing it. + children_dir_status.resize(children.size()); + ForEach(0, children.size(), + [fs, ¤t_dir, &children, &fixed_prefix, + &children_dir_status](int i) { + const string child_path = io::JoinPath(current_dir, children[i]); + // In case the child_path doesn't start with the fixed_prefix then + // we don't need to explore this path. + if (!str_util::StartsWith(child_path, fixed_prefix)) { + children_dir_status[i] = Status(tensorflow::error::CANCELLED, + "Operation not needed"); + } else { + children_dir_status[i] = fs->IsDirectory(child_path); + } + }); + for (int i = 0; i < children.size(); ++i) { + const string child_path = io::JoinPath(current_dir, children[i]); + // If the IsDirectory call was cancelled we bail. + if (children_dir_status[i].code() == tensorflow::error::CANCELLED) { + continue; + } + // If the child is a directory add it to the queue. + if (children_dir_status[i].ok()) { + dir_q.push_back(child_path); + } + all_files.push_back(child_path); + } + } + + // Match all obtained files to the input pattern. + for (const auto& f : all_files) { + if (env->MatchPath(f, eval_pattern)) { + results->push_back(f); + } + } + return ret; +} + +} // namespace internal +} // namespace tensorflow diff --git a/tensorflow/core/platform/file_system_helper.h b/tensorflow/core/platform/file_system_helper.h new file mode 100644 index 0000000000..8d812b0e38 --- /dev/null +++ b/tensorflow/core/platform/file_system_helper.h @@ -0,0 +1,51 @@ +/* Copyright 2018 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#ifndef TENSORFLOW_CORE_PLATFORM_FILE_SYSTEM_HELPER_H_ +#define TENSORFLOW_CORE_PLATFORM_FILE_SYSTEM_HELPER_H_ + +#include +#include + +#include "tensorflow/core/lib/core/status.h" + +namespace tensorflow { + +class FileSystem; +class Env; + +namespace internal { + +// Given a pattern, stores in 'results' the set of paths (in the given file +// system) that match that pattern. +// +// This helper may be used by implementations of FileSystem::GetMatchingPaths() +// in order to provide parallel scanning of subdirectories (except on iOS). +// +// Arguments: +// fs: may not be null and will be used to identify directories and list +// their contents. +// env: may not be null and will be used to check if a match has been found. +// pattern: see FileSystem::GetMatchingPaths() for details. +// results: will be cleared and may not be null. +// +// Returns an error status if any call to 'fs' failed. +Status GetMatchingPaths(FileSystem* fs, Env* env, const string& pattern, + std::vector* results); + +} // namespace internal +} // namespace tensorflow + +#endif // TENSORFLOW_CORE_PLATFORM_FILE_SYSTEM_HELPER_H_ diff --git a/tensorflow/core/platform/hadoop/hadoop_file_system.cc b/tensorflow/core/platform/hadoop/hadoop_file_system.cc index 74863293a3..9a71fbe2b7 100644 --- a/tensorflow/core/platform/hadoop/hadoop_file_system.cc +++ b/tensorflow/core/platform/hadoop/hadoop_file_system.cc @@ -22,6 +22,7 @@ limitations under the License. #include "tensorflow/core/lib/strings/strcat.h" #include "tensorflow/core/platform/env.h" #include "tensorflow/core/platform/file_system.h" +#include "tensorflow/core/platform/file_system_helper.h" #include "tensorflow/core/platform/logging.h" #include "tensorflow/core/platform/mutex.h" #include "tensorflow/core/platform/posix/error.h" @@ -396,6 +397,11 @@ Status HadoopFileSystem::GetChildren(const string& dir, return Status::OK(); } +Status HadoopFileSystem::GetMatchingPaths(const string& pattern, + std::vector* results) { + return internal::GetMatchingPaths(this, Env::Default(), pattern, results); +} + Status HadoopFileSystem::DeleteFile(const string& fname) { hdfsFS fs = nullptr; TF_RETURN_IF_ERROR(Connect(fname, &fs)); diff --git a/tensorflow/core/platform/hadoop/hadoop_file_system.h b/tensorflow/core/platform/hadoop/hadoop_file_system.h index 5f2b222622..6af7a698ff 100644 --- a/tensorflow/core/platform/hadoop/hadoop_file_system.h +++ b/tensorflow/core/platform/hadoop/hadoop_file_system.h @@ -49,6 +49,9 @@ class HadoopFileSystem : public FileSystem { Status GetChildren(const string& dir, std::vector* result) override; + Status GetMatchingPaths(const string& pattern, + std::vector* results) override; + Status DeleteFile(const string& fname) override; Status CreateDir(const string& name) override; diff --git a/tensorflow/core/platform/null_file_system.h b/tensorflow/core/platform/null_file_system.h index 008e6d54d0..420abc1ada 100644 --- a/tensorflow/core/platform/null_file_system.h +++ b/tensorflow/core/platform/null_file_system.h @@ -22,6 +22,7 @@ limitations under the License. #include "tensorflow/core/platform/env.h" #include "tensorflow/core/platform/file_system.h" +#include "tensorflow/core/platform/file_system_helper.h" namespace tensorflow { @@ -65,6 +66,11 @@ class NullFileSystem : public FileSystem { return errors::Unimplemented("GetChildren unimplemented"); } + Status GetMatchingPaths(const string& pattern, + std::vector* results) override { + return internal::GetMatchingPaths(this, Env::Default(), pattern, results); + } + Status DeleteFile(const string& fname) override { return errors::Unimplemented("DeleteFile unimplemented"); } diff --git a/tensorflow/core/platform/posix/posix_file_system.cc b/tensorflow/core/platform/posix/posix_file_system.cc index 9a8021565c..47bfa020ce 100644 --- a/tensorflow/core/platform/posix/posix_file_system.cc +++ b/tensorflow/core/platform/posix/posix_file_system.cc @@ -31,6 +31,7 @@ limitations under the License. #include "tensorflow/core/lib/core/status.h" #include "tensorflow/core/lib/strings/strcat.h" #include "tensorflow/core/platform/env.h" +#include "tensorflow/core/platform/file_system_helper.h" #include "tensorflow/core/platform/logging.h" #include "tensorflow/core/platform/posix/error.h" #include "tensorflow/core/platform/posix/posix_file_system.h" @@ -225,6 +226,11 @@ Status PosixFileSystem::GetChildren(const string& dir, return Status::OK(); } +Status PosixFileSystem::GetMatchingPaths(const string& pattern, + std::vector* results) { + return internal::GetMatchingPaths(this, Env::Default(), pattern, results); +} + Status PosixFileSystem::DeleteFile(const string& fname) { Status result; if (unlink(TranslateName(fname).c_str()) != 0) { diff --git a/tensorflow/core/platform/posix/posix_file_system.h b/tensorflow/core/platform/posix/posix_file_system.h index 98ffa43b8a..e8898d0a97 100644 --- a/tensorflow/core/platform/posix/posix_file_system.h +++ b/tensorflow/core/platform/posix/posix_file_system.h @@ -47,6 +47,9 @@ class PosixFileSystem : public FileSystem { Status Stat(const string& fname, FileStatistics* stats) override; + Status GetMatchingPaths(const string& pattern, + std::vector* results) override; + Status DeleteFile(const string& fname) override; Status CreateDir(const string& name) override; diff --git a/tensorflow/core/platform/s3/s3_file_system.cc b/tensorflow/core/platform/s3/s3_file_system.cc index 301fcb9dbf..ee423699b2 100644 --- a/tensorflow/core/platform/s3/s3_file_system.cc +++ b/tensorflow/core/platform/s3/s3_file_system.cc @@ -15,6 +15,7 @@ limitations under the License. #include "tensorflow/core/platform/s3/s3_file_system.h" #include "tensorflow/core/lib/io/path.h" #include "tensorflow/core/lib/strings/str_util.h" +#include "tensorflow/core/platform/file_system_helper.h" #include "tensorflow/core/platform/mutex.h" #include "tensorflow/core/platform/s3/aws_logging.h" #include "tensorflow/core/platform/s3/s3_crypto.h" @@ -497,6 +498,11 @@ Status S3FileSystem::Stat(const string& fname, FileStatistics* stats) { return Status::OK(); } +Status S3FileSystem::GetMatchingPaths(const string& pattern, + std::vector* results) { + return internal::GetMatchingPaths(this, Env::Default(), pattern, results); +} + Status S3FileSystem::DeleteFile(const string& fname) { string bucket, object; TF_RETURN_IF_ERROR(ParseS3Path(fname, false, &bucket, &object)); diff --git a/tensorflow/core/platform/s3/s3_file_system.h b/tensorflow/core/platform/s3/s3_file_system.h index 31264be621..5d0565b378 100644 --- a/tensorflow/core/platform/s3/s3_file_system.h +++ b/tensorflow/core/platform/s3/s3_file_system.h @@ -46,6 +46,9 @@ class S3FileSystem : public FileSystem { Status Stat(const string& fname, FileStatistics* stat) override; + Status GetMatchingPaths(const string& pattern, + std::vector* results) override; + Status DeleteFile(const string& fname) override; Status CreateDir(const string& name) override; diff --git a/tensorflow/core/platform/windows/windows_file_system.cc b/tensorflow/core/platform/windows/windows_file_system.cc index 682e46e0fc..dc2efbeaf5 100644 --- a/tensorflow/core/platform/windows/windows_file_system.cc +++ b/tensorflow/core/platform/windows/windows_file_system.cc @@ -28,6 +28,7 @@ limitations under the License. #include "tensorflow/core/lib/core/error_codes.pb.h" #include "tensorflow/core/lib/strings/strcat.h" #include "tensorflow/core/platform/env.h" +#include "tensorflow/core/platform/file_system_helper.h" #include "tensorflow/core/platform/logging.h" #include "tensorflow/core/platform/posix/error.h" #include "tensorflow/core/platform/windows/error.h" @@ -494,7 +495,8 @@ Status WindowsFileSystem::GetMatchingPaths(const string& pattern, // but no code appears to rely on this behavior. string converted_pattern(pattern); std::replace(converted_pattern.begin(), converted_pattern.end(), '\\', '/'); - TF_RETURN_IF_ERROR(FileSystem::GetMatchingPaths(converted_pattern, results)); + TF_RETURN_IF_ERROR(internal::GetMatchingPaths(this, Env::Default(), + converted_pattern, results)); for (string& result : *results) { std::replace(result.begin(), result.end(), '/', '\\'); } diff --git a/tensorflow/core/util/memmapped_file_system.cc b/tensorflow/core/util/memmapped_file_system.cc index a0f43d2d4a..ea0a381f4f 100644 --- a/tensorflow/core/util/memmapped_file_system.cc +++ b/tensorflow/core/util/memmapped_file_system.cc @@ -157,6 +157,12 @@ Status MemmappedFileSystem::GetChildren(const string& filename, return errors::Unimplemented("memmapped format doesn't support GetChildren"); } +Status MemmappedFileSystem::GetMatchingPaths(const string& pattern, + std::vector* results) { + return errors::Unimplemented( + "memmapped format doesn't support GetMatchingPaths"); +} + Status MemmappedFileSystem::DeleteFile(const string& filename) { return errors::Unimplemented("memmapped format doesn't support DeleteFile"); } diff --git a/tensorflow/core/util/memmapped_file_system.h b/tensorflow/core/util/memmapped_file_system.h index 541587aeab..76cc4911f5 100644 --- a/tensorflow/core/util/memmapped_file_system.h +++ b/tensorflow/core/util/memmapped_file_system.h @@ -85,6 +85,8 @@ class MemmappedFileSystem : public FileSystem { Status NewAppendableFile(const string& fname, std::unique_ptr* result) override; Status GetChildren(const string& dir, std::vector* r) override; + Status GetMatchingPaths(const string& pattern, + std::vector* results) override; Status DeleteFile(const string& f) override; Status CreateDir(const string& d) override; Status DeleteDir(const string& d) override; -- cgit v1.2.3