diff options
5 files changed, 165 insertions, 5 deletions
diff --git a/tensorflow/core/grappler/optimizers/graph_optimizer.h b/tensorflow/core/grappler/optimizers/graph_optimizer.h index 765dd13263..bd6bf9f860 100644 --- a/tensorflow/core/grappler/optimizers/graph_optimizer.h +++ b/tensorflow/core/grappler/optimizers/graph_optimizer.h @@ -16,8 +16,11 @@ limitations under the License. #ifndef TENSORFLOW_CORE_GRAPPLER_OPTIMIZERS_GRAPH_OPTIMIZER_H_ #define TENSORFLOW_CORE_GRAPPLER_OPTIMIZERS_GRAPH_OPTIMIZER_H_ +#include <atomic> #include "tensorflow/core/framework/graph.pb.h" #include "tensorflow/core/lib/core/status.h" +#include "tensorflow/core/platform/mutex.h" +#include "tensorflow/core/platform/thread_annotations.h" namespace tensorflow { namespace grappler { @@ -29,6 +32,7 @@ struct GrapplerItem; // optimization of a GrapplerItem for running on a cluster. class GraphOptimizer { public: + GraphOptimizer() : is_cancelled_(false) {} virtual ~GraphOptimizer() {} virtual string name() const = 0; @@ -45,8 +49,25 @@ class GraphOptimizer { // call to Optimize) performed. Lower "result" scores are better. virtual void Feedback(Cluster* cluster, const GrapplerItem& item, const GraphDef& optimized_graph, double result) = 0; + + // Best effort cancellation. Sets is_cancelled to true and requests that the + // optimizer returns as soon as possible from active calls to Optimize() or + // FeedBack(). + void Cancel() { is_cancelled_ = true; } + + bool is_cancelled() const { return is_cancelled_; } + + private: + std::atomic<bool> is_cancelled_; }; +#define GRAPPLER_RETURN_IF_CANCELLED() \ + do { \ + if (is_cancelled()) { \ + return errors::DeadlineExceeded(this->name(), " was cancelled."); \ + } \ + } while (0) + } // end namespace grappler } // end namespace tensorflow diff --git a/tensorflow/core/grappler/optimizers/meta_optimizer.cc b/tensorflow/core/grappler/optimizers/meta_optimizer.cc index 3f33b16ba8..7488cedec5 100644 --- a/tensorflow/core/grappler/optimizers/meta_optimizer.cc +++ b/tensorflow/core/grappler/optimizers/meta_optimizer.cc @@ -14,6 +14,9 @@ limitations under the License. ==============================================================================*/ #include "tensorflow/core/grappler/optimizers/meta_optimizer.h" + +#include <memory> + #include "tensorflow/core/common_runtime/function.h" #include "tensorflow/core/framework/function.pb.h" #include "tensorflow/core/framework/versions.pb.h" @@ -37,7 +40,11 @@ limitations under the License. #include "tensorflow/core/grappler/utils/functions.h" #include "tensorflow/core/grappler/utils/topological_sort.h" #include "tensorflow/core/lib/core/status.h" +#include "tensorflow/core/lib/core/threadpool.h" #include "tensorflow/core/lib/gtl/map_util.h" +#include "tensorflow/core/platform/cpu_info.h" +#include "tensorflow/core/platform/notification.h" +#include "tensorflow/core/platform/thread_annotations.h" #include "tensorflow/core/util/ptr_util.h" namespace tensorflow { @@ -115,6 +122,21 @@ std::unique_ptr<GraphOptimizer> MetaOptimizer::MakeNewOptimizer( #undef MK_OPT +MetaOptimizer::MetaOptimizer(DeviceBase* cpu_device, const RewriterConfig& cfg) + : cpu_device_(cpu_device), cfg_(cfg) { + // TODO(rmlarsen): Increase kNumThreads to, say, port::NumSchedulableCPUs() + // if we want to the threadpool for parallelizing Grappler + const int kNumThreads = 1; + thread_pool_ = absl::make_unique<thread::ThreadPool>( + Env::Default(), "MetaOptimizerThreadPool", kNumThreads); +} + +MetaOptimizer::~MetaOptimizer() { + // The ThreadPool destructor waits for threads to finish, so we don't + // pull the rug out from under them. + thread_pool_.reset(); +} + Status MetaOptimizer::InitializeOptimizers( std::vector<std::unique_ptr<GraphOptimizer>>* optimizers) const { if (cfg_.disable_meta_optimizer()) { @@ -310,6 +332,7 @@ Status MetaOptimizer::OptimizeGraph(Cluster* cluster, const GrapplerItem& item, VLOG(4) << "Starting optimization iteration " << iteration; for (const auto& optimizer : optimizers) { + GRAPPLER_RETURN_IF_CANCELLED(); // Some optimizers can run only once. if (iteration > 0 && IsRunOnceOptimizer(optimizer->name())) continue; // Some must run only on the last iteration. @@ -368,6 +391,7 @@ Status MetaOptimizer::RunOptimizer( // resets optimized_graph to an empty graph. optimized_graph->Swap(&optimized_item->graph); *optimized_graph = GraphDef(); + // TODO(rmlarsen): Add timeout for individual optimizers. Status status = optimizer->Optimize(cluster, *optimized_item, optimized_graph); uint64 end_us = Env::Default()->NowMicros(); @@ -389,14 +413,15 @@ Status MetaOptimizer::RunOptimizer( return status; } -Status MetaOptimizer::Optimize(Cluster* cluster, const GrapplerItem& item, - GraphDef* optimized_graph) { +Status MetaOptimizer::OptimizeMainGraphAndFunctionLibrary( + Cluster* cluster, const GrapplerItem& item, GraphDef* optimized_graph) { VLOG(1) << "Starting optimization for grappler item: " << item.id; optimization_results_.clear(); // 1. Optimize main graph TF_RETURN_IF_ERROR(OptimizeGraph(cluster, item, optimized_graph)); VLOG(1) << "Optimized main graph."; + GRAPPLER_RETURN_IF_CANCELLED(); // Skip optimizing functions if this is a TPU graph. Currently, Grappler // passes do not handle TPU functions correctly in a variety of ways (Note @@ -432,6 +457,8 @@ Status MetaOptimizer::Optimize(Cluster* cluster, const GrapplerItem& item, optimize_function_library = false; for (const FunctionDef& func : optimized_graph->library().function()) { + GRAPPLER_RETURN_IF_CANCELLED(); + const string& func_name = func.signature().name(); // Skip already optimized functions. @@ -506,6 +533,43 @@ void MetaOptimizer::PrintResult() { } } +Status MetaOptimizer::Optimize(Cluster* cluster, const GrapplerItem& item, + GraphDef* optimized_graph) { + const int64 kFiveMinutesInUsec = 5 * 60 * 1000 * 1000; + const int64 timeout_usec = (cfg_.meta_optimizer_timeout_ms() == 0 + ? kFiveMinutesInUsec + : cfg_.meta_optimizer_timeout_ms() * 1000); + if (timeout_usec < 0) { + return OptimizeMainGraphAndFunctionLibrary(cluster, item, optimized_graph); + } + + GraphDef optimized_with_timeout; + Status status; + Notification done; + thread_pool_->Schedule( + [this, cluster, &done, &optimized_with_timeout, &item, &status]() { + status = this->OptimizeMainGraphAndFunctionLibrary( + cluster, item, &optimized_with_timeout); + done.Notify(); + }); + + const bool notified = WaitForNotificationWithTimeout(&done, timeout_usec); + if (notified && status.ok()) { + optimized_graph->Swap(&optimized_with_timeout); + } else { + *optimized_graph = item.graph; + if (!notified) { + this->Cancel(); + done.WaitForNotification(); + status = errors::DeadlineExceeded( + "Grappler MetaOptimizer timed out after ", + static_cast<float>(timeout_usec) / (1000 * 1000), " seconds"); + LOG(WARNING) << status.error_message(); + } + } + return status; +} + void MetaOptimizer::Feedback(Cluster* cluster, const GrapplerItem& item, const GraphDef& pruned_graph, double result) { // Nothing to do for MetaOptimizer. diff --git a/tensorflow/core/grappler/optimizers/meta_optimizer.h b/tensorflow/core/grappler/optimizers/meta_optimizer.h index 99a0a33ffa..35d6a4559b 100644 --- a/tensorflow/core/grappler/optimizers/meta_optimizer.h +++ b/tensorflow/core/grappler/optimizers/meta_optimizer.h @@ -20,6 +20,7 @@ limitations under the License. #include "tensorflow/core/grappler/grappler_item.h" #include "tensorflow/core/grappler/optimizers/graph_optimizer.h" #include "tensorflow/core/lib/core/status.h" +#include "tensorflow/core/lib/core/threadpool.h" #include "tensorflow/core/protobuf/rewriter_config.pb.h" namespace tensorflow { @@ -28,9 +29,8 @@ namespace grappler { // Run the other grappler optimizers based on the specified rewriter config. class MetaOptimizer : public GraphOptimizer { public: - MetaOptimizer(DeviceBase* cpu_device, const RewriterConfig& cfg) - : cpu_device_(cpu_device), cfg_(cfg) {} - ~MetaOptimizer() override = default; + MetaOptimizer(DeviceBase* cpu_device, const RewriterConfig& cfg); + ~MetaOptimizer(); string name() const override { return "meta_optimizer"; }; @@ -65,9 +65,18 @@ class MetaOptimizer : public GraphOptimizer { Status OptimizeGraph(Cluster* cluster, const GrapplerItem& item, GraphDef* optimized_graph); + // Run optimization passes over the main graph and for functions in the + // function library. + Status OptimizeMainGraphAndFunctionLibrary(Cluster* cluster, + const GrapplerItem& item, + GraphDef* optimized_graph); + DeviceBase* const cpu_device_; // may be NULL RewriterConfig cfg_; + // Thread pool used for launching optimizers asynchronously. + std::unique_ptr<thread::ThreadPool> thread_pool_; + struct OptimizerResult { string optimizer_name; string result; diff --git a/tensorflow/core/grappler/optimizers/meta_optimizer_test.cc b/tensorflow/core/grappler/optimizers/meta_optimizer_test.cc index 3f3f43382f..7f1dd91f09 100644 --- a/tensorflow/core/grappler/optimizers/meta_optimizer_test.cc +++ b/tensorflow/core/grappler/optimizers/meta_optimizer_test.cc @@ -461,6 +461,68 @@ TEST_F(MetaOptimizerTest, OptimizeFunctionLibraryWithRestrictions) { EXPECT_FALSE(allowed_optimizations_my_mul_2->non_differentiable_rewrites); } +class SleepingOptimizer : public CustomGraphOptimizer { + public: + SleepingOptimizer() {} + string name() const override { return "test_optimizer"; } + + Status Init( + const tensorflow::RewriterConfig_CustomGraphOptimizer* config) override { + return Status::OK(); + } + + Status Optimize(Cluster* cluster, const GrapplerItem& item, + GraphDef* optimized_graph) override { + *optimized_graph = item.graph; + optimized_graph->add_node(); + sleep(1); + return Status::OK(); + } + + void Feedback(Cluster* cluster, const GrapplerItem& item, + const GraphDef& optimized_graph, double result) override {} +}; + +REGISTER_GRAPH_OPTIMIZER(SleepingOptimizer); + +TEST_F(MetaOptimizerTest, OptimizerTimesOut) { + TrivialTestGraphInputYielder fake_input(4, 1, 10, false, {"CPU:0"}); + GrapplerItem item; + CHECK(fake_input.NextItem(&item)); + + RewriterConfig rewriter_config; + rewriter_config.add_optimizers("SleepingOptimizer"); + rewriter_config.set_min_graph_nodes(-1); + rewriter_config.set_meta_optimizer_timeout_ms(1500); + rewriter_config.set_meta_optimizer_iterations(RewriterConfig::TWO); + + MetaOptimizer optimizer(nullptr, rewriter_config); + GraphDef output; + const Status status = optimizer.Optimize(nullptr, item, &output); + EXPECT_EQ(status.error_message(), + "Grappler MetaOptimizer timed out after 1.5 seconds"); + // Make sure the graph was reverted to the original regardless of when the + // optimizer timed out. + CompareGraphs(item.graph, output); +} + +TEST_F(MetaOptimizerTest, OptimizerDoesNotTimeOut) { + TrivialTestGraphInputYielder fake_input(4, 1, 10, false, {"CPU:0"}); + GrapplerItem item; + CHECK(fake_input.NextItem(&item)); + + RewriterConfig rewriter_config; + rewriter_config.add_optimizers("SleepingOptimizer"); + rewriter_config.set_min_graph_nodes(-1); + rewriter_config.set_meta_optimizer_timeout_ms(1500); + rewriter_config.set_meta_optimizer_iterations(RewriterConfig::ONE); + MetaOptimizer optimizer(nullptr, rewriter_config); + GraphDef output; + const Status status = optimizer.Optimize(nullptr, item, &output); + TF_EXPECT_OK(status); + EXPECT_EQ(item.graph.node_size() + 1, output.node_size()); +} + } // namespace } // namespace grappler } // namespace tensorflow diff --git a/tensorflow/core/protobuf/rewriter_config.proto b/tensorflow/core/protobuf/rewriter_config.proto index 8c31468ff5..7ccd54b818 100644 --- a/tensorflow/core/protobuf/rewriter_config.proto +++ b/tensorflow/core/protobuf/rewriter_config.proto @@ -83,6 +83,10 @@ message RewriterConfig { // Controls how many times we run the optimizers in meta optimizer (default // is once). NumIterationsType meta_optimizer_iterations = 12; + // Maximum number of milliseconds to spend optimizing a single graph before + // timing out. If equal to 0 the system picks a default (currently 5 minutes). + // If less than 0 the optimizer will never time out. + int64 meta_optimizer_timeout_ms = 20; // The minimum number of nodes in a graph to optimizer. For smaller graphs, // optimization is skipped. |