diff options
author | 2018-10-01 17:18:24 -0700 | |
---|---|---|
committer | 2018-10-01 17:22:39 -0700 | |
commit | bfbe2bbe6a83a4acfa8f87aa5c8228e74b37bb61 (patch) | |
tree | 18a274c3c1a8f917fc8addf9630ddff55436a4fd /tensorflow/core/framework | |
parent | 80f8931682aeaae89786f0940892a6557b4cfd67 (diff) |
[tf.data] More robust solution for input pipeline <--> performance model coordination.
PiperOrigin-RevId: 215309735
Diffstat (limited to 'tensorflow/core/framework')
-rw-r--r-- | tensorflow/core/framework/dataset.h | 12 | ||||
-rw-r--r-- | tensorflow/core/framework/model.cc | 83 | ||||
-rw-r--r-- | tensorflow/core/framework/model.h | 42 |
3 files changed, 71 insertions, 66 deletions
diff --git a/tensorflow/core/framework/dataset.h b/tensorflow/core/framework/dataset.h index 697e0604bf..8c1151cb56 100644 --- a/tensorflow/core/framework/dataset.h +++ b/tensorflow/core/framework/dataset.h @@ -657,15 +657,15 @@ class DatasetBaseIterator : public IteratorBase { // When performance modeling is enabled, this method adds a tunable parameter // to the model node corresponding to this iterator. // - // The performance modeling logic may use `value` to set the value of the + // The performance modeling logic may use `state` to set the value of the // tunable parameter at any point during the lifetime of this iterator. When - // it does, it notifies `cond_var`. + // it does, it acquires `state->mu` and notifies `state->cond_var`. void AddTunableParameter(IteratorContext* ctx, const string& name, - std::atomic<int64>* value, int64 min, int64 max, - condition_variable* cond_var) { + std::shared_ptr<model::SharedState> state, int64 min, + int64 max) { if (ctx->model()) { - ctx->model()->AddTunableParameter(prefix(), name, value, min, max, - cond_var); + ctx->model()->AddTunableParameter(prefix(), name, std::move(state), min, + max); } } diff --git a/tensorflow/core/framework/model.cc b/tensorflow/core/framework/model.cc index b0330ec990..bfdb3a6658 100644 --- a/tensorflow/core/framework/model.cc +++ b/tensorflow/core/framework/model.cc @@ -296,12 +296,12 @@ void Model::AddProcessingTime(const string& name, int64 delta) { void Model::AddTunableParameter(const string& node_name, const string& parameter_name, - std::atomic<int64>* value, int64 min, int64 max, - condition_variable* cond_var) { + std::shared_ptr<SharedState> state, int64 min, + int64 max) { tf_shared_lock l(mu_); auto node = *gtl::FindOrNull(lookup_table_, node_name); DCHECK(node); - node->add_tunable_param(parameter_name, value, min, max, cond_var); + node->add_tunable_param(parameter_name, std::move(state), min, max); } // The optimization algorithm starts by setting all tunable parallelism @@ -311,54 +311,55 @@ void Model::AddTunableParameter(const string& node_name, // is less than or equal to the processing time needed to produce an element // divided by CPU budget. void Model::Optimize(int64 cpu_budget) { - tf_shared_lock lock(mu_); std::vector<std::shared_ptr<Model::Node::Tunable>> tunables; - const int64 processing_time = ProcessingTime(); - tunables = CollectTunables(); - for (auto tunable : tunables) { - tunable->value = 1; - } - while (true) { - const int64 output_time = OutputTime(); - bool all_tunables = true; - for (auto& tunable : tunables) { - if (tunable->value < tunable->max) { - all_tunables = false; + { + tf_shared_lock lock(mu_); + const int64 processing_time = ProcessingTime(); + tunables = CollectTunables(); + for (auto tunable : tunables) { + tunable->value = 1; + } + while (true) { + const int64 output_time = OutputTime(); + bool all_tunables = true; + for (auto& tunable : tunables) { + if (tunable->value < tunable->max) { + all_tunables = false; + break; + } + } + if (output_time < processing_time / cpu_budget || all_tunables) { break; } - } - if (output_time < processing_time / cpu_budget || all_tunables) { - break; - } - int64 best_delta = -1; - Model::Node::Tunable* best_tunable = nullptr; - for (auto& tunable : tunables) { - if (tunable->value == tunable->max) { - continue; + int64 best_delta = -1; + Model::Node::Tunable* best_tunable = nullptr; + for (auto& tunable : tunables) { + if (tunable->value == tunable->max) { + continue; + } + tunable->value++; + int64 delta = output_time - OutputTime(); + if (delta > best_delta) { + best_delta = delta; + best_tunable = tunable.get(); + } + tunable->value--; } - tunable->value++; - int64 delta = output_time - OutputTime(); - if (delta > best_delta) { - best_delta = delta; - best_tunable = tunable.get(); + if (!best_tunable) { + // NOTE: This can happen because we are performing the optimization + // while the model data is changing. If this becomes an issue, we should + // look into performing the optimization using a model snapshot. + break; } - tunable->value--; + best_tunable->value++; } - if (!best_tunable) { - // NOTE: This can happen because we are performing the optimization - // while the model data is changing. If this becomes an issue, we should - // look into performing the optimization using a model snapshot. - break; - } - best_tunable->value++; } VLOG(2) << "Number of knobs: " << tunables.size(); for (auto& tunable : tunables) { VLOG(2) << "Setting tunable parameter: " << tunable->value; - tunable->value_ptr->store(tunable->value); - if (tunable->cond_var) { - tunable->cond_var->notify_all(); - } + mutex_lock l(*tunable->state->mu); + tunable->state->value = tunable->value; + tunable->state->cond_var->notify_all(); } } diff --git a/tensorflow/core/framework/model.h b/tensorflow/core/framework/model.h index 26402f5cd3..eae0fa70e8 100644 --- a/tensorflow/core/framework/model.h +++ b/tensorflow/core/framework/model.h @@ -33,6 +33,19 @@ namespace tensorflow { namespace data { namespace model { +// Represents thread-safe state that can be shared between an input pipeline and +// the performance model. +struct SharedState { + public: + explicit SharedState(int64 value, std::shared_ptr<mutex> mu, + std::shared_ptr<condition_variable> cond_var) + : value(value), mu(std::move(mu)), cond_var(std::move(cond_var)) {} + + std::shared_ptr<mutex> mu; + std::shared_ptr<condition_variable> cond_var; + int64 value; +}; + // Abstract representation of a TensorFlow input pipeline that can be used // for collecting runtime information and optimizing performance. It collects // runtime information about execution of the input pipeline that is used to @@ -62,8 +75,8 @@ class Model { // Adds a tunable parameter for the given node. void AddTunableParameter(const string& node_name, const string& parameter_name, - std::atomic<int64>* value, int64 min, int64 max, - condition_variable* cond_var) LOCKS_EXCLUDED(mu_); + std::shared_ptr<SharedState> value, int64 min, + int64 max) LOCKS_EXCLUDED(mu_); // Runs optimization. void Optimize(int64 cpu_budget) LOCKS_EXCLUDED(mu_); @@ -109,13 +122,8 @@ class Model { public: // Represents a tunable parameter. struct Tunable { - Tunable(std::atomic<int64>* value, int64 min, int64 max, - condition_variable* cond_var) - : value(*value), - min(min), - max(max), - value_ptr(value), - cond_var(cond_var) {} + Tunable(std::shared_ptr<SharedState> state, int64 min, int64 max) + : value(state->value), min(min), max(max), state(std::move(state)) {} // Identifies the model value of the parameter. This can be different from // the actual value (e.g. during optimization search). @@ -127,12 +135,8 @@ class Model { // Identifies the maximum value of the parameter. int64 max; - // Points to the actual value of the parameter. Not owned. - std::atomic<int64>* value_ptr; - - // If non-null, this condition variable is notified when the model updates - // the actual value of the parameter (via `value_ptr`). Not owned. - condition_variable* cond_var; + // Shared state of the parameter. + std::shared_ptr<SharedState> state; }; Node(int64 id, const string& name, std::shared_ptr<Node> output) @@ -158,12 +162,12 @@ class Model { } // Adds a tunable parameter. - void add_tunable_param(const string& name, std::atomic<int64>* value, - int64 min, int64 max, condition_variable* cond_var) - LOCKS_EXCLUDED(mu_) { + void add_tunable_param(const string& name, + std::shared_ptr<SharedState> state, int64 min, + int64 max) LOCKS_EXCLUDED(mu_) { mutex_lock l(mu_); tunable_params_[name] = - std::make_shared<Tunable>(value, min, max, cond_var); + std::make_shared<Tunable>(std::move(state), min, max); } // Returns the unique node ID. |