aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/core/framework
diff options
context:
space:
mode:
authorGravatar Jiri Simsa <jsimsa@google.com>2018-10-01 17:18:24 -0700
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2018-10-01 17:22:39 -0700
commitbfbe2bbe6a83a4acfa8f87aa5c8228e74b37bb61 (patch)
tree18a274c3c1a8f917fc8addf9630ddff55436a4fd /tensorflow/core/framework
parent80f8931682aeaae89786f0940892a6557b4cfd67 (diff)
[tf.data] More robust solution for input pipeline <--> performance model coordination.
PiperOrigin-RevId: 215309735
Diffstat (limited to 'tensorflow/core/framework')
-rw-r--r--tensorflow/core/framework/dataset.h12
-rw-r--r--tensorflow/core/framework/model.cc83
-rw-r--r--tensorflow/core/framework/model.h42
3 files changed, 71 insertions, 66 deletions
diff --git a/tensorflow/core/framework/dataset.h b/tensorflow/core/framework/dataset.h
index 697e0604bf..8c1151cb56 100644
--- a/tensorflow/core/framework/dataset.h
+++ b/tensorflow/core/framework/dataset.h
@@ -657,15 +657,15 @@ class DatasetBaseIterator : public IteratorBase {
// When performance modeling is enabled, this method adds a tunable parameter
// to the model node corresponding to this iterator.
//
- // The performance modeling logic may use `value` to set the value of the
+ // The performance modeling logic may use `state` to set the value of the
// tunable parameter at any point during the lifetime of this iterator. When
- // it does, it notifies `cond_var`.
+ // it does, it acquires `state->mu` and notifies `state->cond_var`.
void AddTunableParameter(IteratorContext* ctx, const string& name,
- std::atomic<int64>* value, int64 min, int64 max,
- condition_variable* cond_var) {
+ std::shared_ptr<model::SharedState> state, int64 min,
+ int64 max) {
if (ctx->model()) {
- ctx->model()->AddTunableParameter(prefix(), name, value, min, max,
- cond_var);
+ ctx->model()->AddTunableParameter(prefix(), name, std::move(state), min,
+ max);
}
}
diff --git a/tensorflow/core/framework/model.cc b/tensorflow/core/framework/model.cc
index b0330ec990..bfdb3a6658 100644
--- a/tensorflow/core/framework/model.cc
+++ b/tensorflow/core/framework/model.cc
@@ -296,12 +296,12 @@ void Model::AddProcessingTime(const string& name, int64 delta) {
void Model::AddTunableParameter(const string& node_name,
const string& parameter_name,
- std::atomic<int64>* value, int64 min, int64 max,
- condition_variable* cond_var) {
+ std::shared_ptr<SharedState> state, int64 min,
+ int64 max) {
tf_shared_lock l(mu_);
auto node = *gtl::FindOrNull(lookup_table_, node_name);
DCHECK(node);
- node->add_tunable_param(parameter_name, value, min, max, cond_var);
+ node->add_tunable_param(parameter_name, std::move(state), min, max);
}
// The optimization algorithm starts by setting all tunable parallelism
@@ -311,54 +311,55 @@ void Model::AddTunableParameter(const string& node_name,
// is less than or equal to the processing time needed to produce an element
// divided by CPU budget.
void Model::Optimize(int64 cpu_budget) {
- tf_shared_lock lock(mu_);
std::vector<std::shared_ptr<Model::Node::Tunable>> tunables;
- const int64 processing_time = ProcessingTime();
- tunables = CollectTunables();
- for (auto tunable : tunables) {
- tunable->value = 1;
- }
- while (true) {
- const int64 output_time = OutputTime();
- bool all_tunables = true;
- for (auto& tunable : tunables) {
- if (tunable->value < tunable->max) {
- all_tunables = false;
+ {
+ tf_shared_lock lock(mu_);
+ const int64 processing_time = ProcessingTime();
+ tunables = CollectTunables();
+ for (auto tunable : tunables) {
+ tunable->value = 1;
+ }
+ while (true) {
+ const int64 output_time = OutputTime();
+ bool all_tunables = true;
+ for (auto& tunable : tunables) {
+ if (tunable->value < tunable->max) {
+ all_tunables = false;
+ break;
+ }
+ }
+ if (output_time < processing_time / cpu_budget || all_tunables) {
break;
}
- }
- if (output_time < processing_time / cpu_budget || all_tunables) {
- break;
- }
- int64 best_delta = -1;
- Model::Node::Tunable* best_tunable = nullptr;
- for (auto& tunable : tunables) {
- if (tunable->value == tunable->max) {
- continue;
+ int64 best_delta = -1;
+ Model::Node::Tunable* best_tunable = nullptr;
+ for (auto& tunable : tunables) {
+ if (tunable->value == tunable->max) {
+ continue;
+ }
+ tunable->value++;
+ int64 delta = output_time - OutputTime();
+ if (delta > best_delta) {
+ best_delta = delta;
+ best_tunable = tunable.get();
+ }
+ tunable->value--;
}
- tunable->value++;
- int64 delta = output_time - OutputTime();
- if (delta > best_delta) {
- best_delta = delta;
- best_tunable = tunable.get();
+ if (!best_tunable) {
+ // NOTE: This can happen because we are performing the optimization
+ // while the model data is changing. If this becomes an issue, we should
+ // look into performing the optimization using a model snapshot.
+ break;
}
- tunable->value--;
+ best_tunable->value++;
}
- if (!best_tunable) {
- // NOTE: This can happen because we are performing the optimization
- // while the model data is changing. If this becomes an issue, we should
- // look into performing the optimization using a model snapshot.
- break;
- }
- best_tunable->value++;
}
VLOG(2) << "Number of knobs: " << tunables.size();
for (auto& tunable : tunables) {
VLOG(2) << "Setting tunable parameter: " << tunable->value;
- tunable->value_ptr->store(tunable->value);
- if (tunable->cond_var) {
- tunable->cond_var->notify_all();
- }
+ mutex_lock l(*tunable->state->mu);
+ tunable->state->value = tunable->value;
+ tunable->state->cond_var->notify_all();
}
}
diff --git a/tensorflow/core/framework/model.h b/tensorflow/core/framework/model.h
index 26402f5cd3..eae0fa70e8 100644
--- a/tensorflow/core/framework/model.h
+++ b/tensorflow/core/framework/model.h
@@ -33,6 +33,19 @@ namespace tensorflow {
namespace data {
namespace model {
+// Represents thread-safe state that can be shared between an input pipeline and
+// the performance model.
+struct SharedState {
+ public:
+ explicit SharedState(int64 value, std::shared_ptr<mutex> mu,
+ std::shared_ptr<condition_variable> cond_var)
+ : value(value), mu(std::move(mu)), cond_var(std::move(cond_var)) {}
+
+ std::shared_ptr<mutex> mu;
+ std::shared_ptr<condition_variable> cond_var;
+ int64 value;
+};
+
// Abstract representation of a TensorFlow input pipeline that can be used
// for collecting runtime information and optimizing performance. It collects
// runtime information about execution of the input pipeline that is used to
@@ -62,8 +75,8 @@ class Model {
// Adds a tunable parameter for the given node.
void AddTunableParameter(const string& node_name,
const string& parameter_name,
- std::atomic<int64>* value, int64 min, int64 max,
- condition_variable* cond_var) LOCKS_EXCLUDED(mu_);
+ std::shared_ptr<SharedState> value, int64 min,
+ int64 max) LOCKS_EXCLUDED(mu_);
// Runs optimization.
void Optimize(int64 cpu_budget) LOCKS_EXCLUDED(mu_);
@@ -109,13 +122,8 @@ class Model {
public:
// Represents a tunable parameter.
struct Tunable {
- Tunable(std::atomic<int64>* value, int64 min, int64 max,
- condition_variable* cond_var)
- : value(*value),
- min(min),
- max(max),
- value_ptr(value),
- cond_var(cond_var) {}
+ Tunable(std::shared_ptr<SharedState> state, int64 min, int64 max)
+ : value(state->value), min(min), max(max), state(std::move(state)) {}
// Identifies the model value of the parameter. This can be different from
// the actual value (e.g. during optimization search).
@@ -127,12 +135,8 @@ class Model {
// Identifies the maximum value of the parameter.
int64 max;
- // Points to the actual value of the parameter. Not owned.
- std::atomic<int64>* value_ptr;
-
- // If non-null, this condition variable is notified when the model updates
- // the actual value of the parameter (via `value_ptr`). Not owned.
- condition_variable* cond_var;
+ // Shared state of the parameter.
+ std::shared_ptr<SharedState> state;
};
Node(int64 id, const string& name, std::shared_ptr<Node> output)
@@ -158,12 +162,12 @@ class Model {
}
// Adds a tunable parameter.
- void add_tunable_param(const string& name, std::atomic<int64>* value,
- int64 min, int64 max, condition_variable* cond_var)
- LOCKS_EXCLUDED(mu_) {
+ void add_tunable_param(const string& name,
+ std::shared_ptr<SharedState> state, int64 min,
+ int64 max) LOCKS_EXCLUDED(mu_) {
mutex_lock l(mu_);
tunable_params_[name] =
- std::make_shared<Tunable>(value, min, max, cond_var);
+ std::make_shared<Tunable>(std::move(state), min, max);
}
// Returns the unique node ID.