aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Vijay Vasudevan <vrv@google.com>2016-01-11 11:37:37 -0800
committerGravatar Vijay Vasudevan <vrv@google.com>2016-01-11 11:37:37 -0800
commit96aa23e45e72bd765920cc957090ff453e25cf64 (patch)
treef4c15c0ae3f5bb139329c2817d8b5cbb376115e5
parent681db2caf1fd48dfc85500817cd40cfde78dc67a (diff)
TensorFlow: remove simple executor state and has_control_flow flag everywhere.
Change: 111871361
-rw-r--r--tensorflow/core/common_runtime/direct_session.cc12
-rw-r--r--tensorflow/core/common_runtime/executor.cc518
-rw-r--r--tensorflow/core/common_runtime/executor.h3
-rw-r--r--tensorflow/core/common_runtime/function.cc1
-rw-r--r--tensorflow/core/common_runtime/kernel_benchmark_testlib.cc34
5 files changed, 15 insertions, 553 deletions
diff --git a/tensorflow/core/common_runtime/direct_session.cc b/tensorflow/core/common_runtime/direct_session.cc
index 5e57735fea..2bad722cc1 100644
--- a/tensorflow/core/common_runtime/direct_session.cc
+++ b/tensorflow/core/common_runtime/direct_session.cc
@@ -350,17 +350,6 @@ Status DirectSession::GetOrCreateExecutors(
return s;
}
- bool has_control_flow = false;
- for (const auto& graph : graphs) {
- for (const Node* n : graph.second->nodes()) {
- if (IsControlFlow(n)) {
- has_control_flow = true;
- break;
- }
- }
- if (has_control_flow) break;
- }
-
std::unique_ptr<ExecutorsAndKeys> ek(new ExecutorsAndKeys);
ek->func_defs = fdefs;
ek->items.reserve(graphs.size());
@@ -382,7 +371,6 @@ Status DirectSession::GetOrCreateExecutors(
NewFunctionLibraryRuntime(device, runner, graph_def_version, fdefs);
LocalExecutorParams params;
- params.has_control_flow = has_control_flow;
params.device = device;
params.function_library = item->flib;
auto lib = item->flib;
diff --git a/tensorflow/core/common_runtime/executor.cc b/tensorflow/core/common_runtime/executor.cc
index 0537294ceb..e42d2e260f 100644
--- a/tensorflow/core/common_runtime/executor.cc
+++ b/tensorflow/core/common_runtime/executor.cc
@@ -227,7 +227,6 @@ class ExecutorImpl : public Executor {
private:
friend class ExecutorState;
- friend class SimpleExecutorState;
// Owned.
LocalExecutorParams params_;
@@ -275,9 +274,6 @@ Status ExecutorImpl::Initialize() {
++frame_input_count_[frame_name];
}
}
- if (params_.has_control_flow) {
- VLOG(2) << "Graph has control flow.";
- }
if (!s.ok()) return s;
return SetAllocAttrs();
}
@@ -1612,520 +1608,8 @@ void ExecutorState::CleanupFramesIterations(FrameState* frame, int64 iter,
}
}
-// When ExecutorImpl graph has no control flow nodes,
-// SimpleExecutorState is used instead of ExecutorState. It maintains
-// fewer internal state and is convenient for experimenting with async
-// op kernels.
-class SimpleExecutorState {
- public:
- SimpleExecutorState(const Executor::Args& args, ExecutorImpl* impl);
- ~SimpleExecutorState() {
- for (auto it : device_context_map_) {
- it.second->Unref();
- }
- delete slice_reader_cache_;
- }
- void RunAsync(Executor::DoneCallback done);
-
- private:
- typedef SimpleExecutorState ME;
-
- // Not owned.
- Rendezvous* rendezvous_;
- StepStatsCollector* stats_collector_;
- checkpoint::TensorSliceReaderCacheWrapper* slice_reader_cache_;
- FunctionCallFrame* call_frame_;
- const ExecutorImpl* impl_;
- CancellationManager* cancellation_manager_;
- Executor::Args::Runner runner_;
-
- // Owned.
-
- // i-th node's j-th input is in tensors_[impl_->nodes[i].input_start
- // + j]. The output is either a tensor pointer (pass-by-reference)
- // or a tensor (pass-by-value).
- //
- // NOTE: Not protected by mu_ because tensors_ is resized once. Each
- // element of tensors_ is written once by the source node of an edge
- // and is cleared by the destination of the same edge. The latter
- // node is never run concurrently with the former node.
- struct Entry {
- Tensor val = *kEmptyTensor; // A tensor value.
- Tensor* ref = nullptr; // A tensor reference.
- mutex* ref_mu = nullptr; // mutex for *ref if ref is not nullptr.
-
- // Every entry carries an optional DeviceContext containing
- // Device-specific information about how the Tensor was produced.
- DeviceContext* device_context = nullptr;
-
- // The attributes of the allocator that creates the tensor.
- AllocatorAttributes alloc_attr;
- };
-
- // Contains a map from node id to the DeviceContext object that was
- // assigned by the device at the beginning of a step.
- DeviceContextMap device_context_map_;
-
- std::vector<Entry> input_tensors_;
-
- // Step-local resource manager.
- ResourceMgr step_resource_manager_;
-
- // Invoked when the execution finishes.
- Executor::DoneCallback done_cb_;
-
- // How many active threads of computation are being used. Same as
- // the number of pending Process() functions.
- std::atomic_int_fast32_t num_active_;
-
- mutex mu_;
- Status status_ GUARDED_BY(mu_);
-
- // i-th kernel is still waiting for pending[i] inputs.
- class CountDown {
- public:
- CountDown() : v_(0) {}
- void Set(int32 v) { v_.store(v); }
- bool Dec() {
- return v_.load(std::memory_order_acquire) == 1 || v_.fetch_sub(1) == 1;
- }
-
- private:
- std::atomic_int_fast32_t v_;
- };
- std::vector<CountDown> pending_;
-
- // Process Node identified by "id" in current thread. "scheduled_usec"
- // indicates when the node becomes ready and gets scheduled.
- void Process(int id, int64 scheduled_usec);
-
- // Before invoking item->kernel, fills in its "inputs".
- Status PrepareInputs(const NodeItem& item, TensorValueVec* inputs,
- DeviceContextVec* input_device_contexts);
-
- // After item->kernel computation is done, processes its outputs
- // and returns nodes that become "ready".
- typedef gtl::InlinedVector<int, 8> ReadyNodeIds;
- Status ProcessOutputs(const NodeItem& item, OpKernelContext* ctx,
- ReadyNodeIds* ready, NodeExecStats* stats);
-
- // "node" just finishes. Takes ownership of "stats". Returns true if
- // execution has completed.
- bool NodeDone(const Status& s, const Node* node, const ReadyNodeIds& ready,
- NodeExecStats* stats, std::deque<int>* inline_ready);
-
- // Call Process() on all nodes in 'inline_ready'.
- void ProcessInline(const std::deque<int>& inline_ready);
-
- // Schedule all the expensive nodes in 'ready', and put all the inexpensive
- // nodes in 'ready' into 'inline_ready'.
- void ScheduleReady(const ReadyNodeIds& ready, std::deque<int>* inline_ready);
-
- // One thread of control finishes.
- void Finish();
-
- TF_DISALLOW_COPY_AND_ASSIGN(SimpleExecutorState);
-};
-
-SimpleExecutorState::SimpleExecutorState(const Executor::Args& args,
- ExecutorImpl* impl)
- : rendezvous_(args.rendezvous),
- stats_collector_(args.stats_collector),
- slice_reader_cache_(new checkpoint::TensorSliceReaderCacheWrapper),
- call_frame_(args.call_frame),
- impl_(impl),
- cancellation_manager_(args.cancellation_manager),
- runner_(args.runner),
- num_active_(0),
- pending_(impl_->nodes_.size()) {}
-
-void SimpleExecutorState::ProcessInline(const std::deque<int>& inline_ready) {
- if (inline_ready.empty()) return;
- int64 scheduled_usec = 0;
- if (stats_collector_) {
- scheduled_usec = nodestats::NowInUsec();
- }
- for (int id : inline_ready) {
- Process(id, scheduled_usec);
- }
-}
-
-void SimpleExecutorState::ScheduleReady(const ReadyNodeIds& ready,
- std::deque<int>* inline_ready) {
- if (ready.empty()) return;
-
- int64 scheduled_usec = 0;
- if (stats_collector_) {
- scheduled_usec = nodestats::NowInUsec();
- }
- if (inline_ready == nullptr) {
- // Schedule to run all the ready ops in thread pool.
- for (auto id : ready) {
- runner_(std::bind(&ME::Process, this, id, scheduled_usec));
- }
- return;
- }
- const std::vector<NodeItem>& nodes = impl_->nodes_;
- int curr_expensive_node = -1;
- for (auto id : ready) {
- if (!nodes[id].kernel->IsExpensive()) {
- // Inline this inexpensive node.
- inline_ready->push_back(id);
- } else {
- if (curr_expensive_node != -1) {
- // Dispatch to another thread since there is plenty of work to
- // do for this thread.
- runner_(
- std::bind(&ME::Process, this, curr_expensive_node, scheduled_usec));
- }
- curr_expensive_node = id;
- }
- }
- if (curr_expensive_node != -1) {
- if (inline_ready->empty()) {
- // Tail recursion optimization
- inline_ready->push_back(curr_expensive_node);
- } else {
- // There are inline nodes to run already. We dispatch this expensive
- // node to other thread.
- runner_(
- std::bind(&ME::Process, this, curr_expensive_node, scheduled_usec));
- }
- }
-}
-
-void SimpleExecutorState::RunAsync(Executor::DoneCallback done) {
- const Graph* graph = impl_->graph_;
- ReadyNodeIds ready;
-
- // Ask the device to fill in the device context map.
- Device* device = impl_->params_.device;
- device->FillContextMap(graph, &device_context_map_);
-
- for (const Node* n : graph->nodes()) {
- const int id = n->id();
- const int num_in_edges = n->in_edges().size();
- pending_[id].Set(num_in_edges);
- if (num_in_edges == 0) {
- ready.push_back(id);
- }
- }
- if (ready.empty()) {
- done(Status::OK());
- } else {
- num_active_ = ready.size();
- done_cb_ = done;
- input_tensors_.resize(impl_->total_tensors_);
- // Schedule to run all the ready ops in thread pool.
- ScheduleReady(ready, nullptr);
- }
-}
-
-Status SimpleExecutorState::PrepareInputs(
- const NodeItem& item, TensorValueVec* inputs,
- DeviceContextVec* input_device_contexts) {
- const Node* node = item.node;
-
- inputs->clear();
- inputs->resize(node->num_inputs());
- input_device_contexts->clear();
- input_device_contexts->resize(node->num_inputs());
-
- for (int i = 0; i < node->num_inputs(); ++i) {
- const bool expect_ref = IsRefType(node->input_type(i));
- Entry* entry = input_tensors_.data() + item.input_start + i;
- (*input_device_contexts)[i] = entry->device_context;
-
- // i-th input.
- TensorValue* inp = &(*inputs)[i];
-
- if (entry->ref == nullptr) {
- if (expect_ref) {
- return AttachDef(
- errors::InvalidArgument(i, "-th input expects a ref type"),
- item.kernel->def());
- }
- inp->tensor = &entry->val;
- } else {
- if (!entry->ref->IsInitialized() && !IsInitializationOp(item.node)) {
- return AttachDef(
- errors::FailedPrecondition("Attempting to use uninitialized value ",
- item.kernel->def().input(i)),
- item.kernel->def());
- }
- if (expect_ref) {
- inp->mutex_if_ref = entry->ref_mu;
- inp->tensor = entry->ref;
- } else {
- // Automatically deref the tensor ref when the op expects a
- // tensor but is given a ref to a tensor. Need to deref it
- // under the mutex.
- {
- mutex_lock l(*(entry->ref_mu));
- entry->val = *entry->ref;
- }
- inp->tensor = &entry->val;
- }
- }
- }
- return Status::OK();
-}
-
-void SimpleExecutorState::Process(int id, int64 scheduled_usec) {
- const std::vector<NodeItem>& nodes = impl_->nodes_;
- ReadyNodeIds ready;
- std::deque<int> inline_ready;
-
- // Parameters passed to OpKernel::Compute.
- TensorValueVec inputs;
- DeviceContextVec input_device_contexts;
-
- OpKernelContext::Params params;
- Device* device = impl_->params_.device;
- params.device = device;
- // track allocations if and only if we are collecting statistics
- params.track_allocations = (stats_collector_ != nullptr);
- params.rendezvous = rendezvous_;
- params.cancellation_manager = cancellation_manager_;
- params.call_frame = call_frame_;
- params.function_library = impl_->params_.function_library;
- params.resource_manager = device->resource_manager();
- params.step_resource_manager = &step_resource_manager_;
- params.slice_reader_cache = slice_reader_cache_;
- params.inputs = &inputs;
- params.input_device_contexts = &input_device_contexts;
- params.frame_iter = FrameAndIter(0, 0);
-
- Status s;
- NodeExecStats* stats = nullptr;
- bool completed = false;
- inline_ready.push_back(id);
- while (!inline_ready.empty()) {
- id = inline_ready.front();
- inline_ready.pop_front();
- const NodeItem& item = nodes[id];
- const Node* node = item.node;
-
- // Set the device_context for this node id, if it exists.
- auto dc_it = device_context_map_.find(id);
- if (dc_it != device_context_map_.end()) {
- params.op_device_context = dc_it->second;
- }
-
- if (stats_collector_) {
- stats = new NodeExecStats;
- stats->set_node_name(node->name());
- nodestats::SetScheduled(stats, scheduled_usec);
- nodestats::SetAllStart(stats);
- }
-
- VLOG(1) << "Process node: " << id << " " << SummarizeNodeDef(node->def());
-
- // Prepares inputs.
- s = PrepareInputs(item, &inputs, &input_device_contexts);
- if (!s.ok()) {
- // Continue to process the nodes in 'inline_ready'.
- completed = NodeDone(s, item.node, ready, stats, &inline_ready);
- continue;
- }
-
- OpKernel* op_kernel = item.kernel;
- params.op_kernel = op_kernel;
- params.output_alloc_attr = [this, node, op_kernel](int index) {
- return OutputAttributes(&impl_->alloc_attr_, node, op_kernel, index);
- };
-
- // Asynchronous computes.
- AsyncOpKernel* async = op_kernel->AsAsync();
- if (async) {
- auto pcopy = CopyParams(params);
- auto ctx = new OpKernelContext(*pcopy);
- auto done = [this, item, ctx, stats, pcopy]() {
- VLOG(2) << this
- << " Async kernel done: " << SummarizeNodeDef(item.node->def());
- if (stats_collector_) nodestats::SetOpEnd(stats);
- ReadyNodeIds ready;
- Status s = ProcessOutputs(item, ctx, &ready, stats);
- if (stats_collector_) nodestats::SetMemory(stats, ctx);
- // Schedule to run all the ready ops in thread pool.
- bool completed = NodeDone(s, item.node, ready, stats, nullptr);
- delete ctx;
- DeleteParams(pcopy);
- if (completed) Finish();
- };
- if (stats_collector_) nodestats::SetOpStart(stats);
- device->ComputeAsync(async, ctx, done);
- } else {
- // Synchronous computes.
- OpKernelContext ctx(params);
- if (stats_collector_) nodestats::SetOpStart(stats);
- device->Compute(CHECK_NOTNULL(op_kernel), &ctx);
- if (stats_collector_) nodestats::SetOpEnd(stats);
-
- s = ProcessOutputs(item, &ctx, &ready, stats);
- if (stats_collector_) nodestats::SetMemory(stats, &ctx);
- if (stats_collector_) {
- scheduled_usec = nodestats::NowInUsec();
- }
- completed = NodeDone(s, node, ready, stats, &inline_ready);
- }
- } // while !inline_ready.empty()
-
- // This thread of computation is done if completed = true.
- if (completed) Finish();
-}
-
-bool SimpleExecutorState::NodeDone(const Status& s, const Node* node,
- const ReadyNodeIds& ready,
- NodeExecStats* stats,
- std::deque<int>* inline_ready) {
- if (stats_collector_) {
- nodestats::SetAllEnd(stats);
- if (!SetTimelineLabel(node, stats)) {
- // Only record non-transfer nodes.
- stats_collector_->Save(impl_->params_.device->name(), stats);
- } else {
- delete stats;
- }
- }
-
- Rendezvous* captured_rendezvous = nullptr; // Will be set on error.
- if (!s.ok()) {
- // Some error happened. This thread of computation is done.
- mutex_lock l(mu_);
- if (status_.ok()) {
- captured_rendezvous = rendezvous_;
- if (captured_rendezvous) captured_rendezvous->Ref();
- status_ = s;
- }
- }
- if (captured_rendezvous) {
- // If we captured the rendezvous_ pointer, we are in an error condition.
- // Use captured_rendezvous, in case "this" is deleted by another thread.
- TRACEPRINTF("StartAbort: %s", s.ToString().c_str());
- captured_rendezvous->StartAbort(s);
- captured_rendezvous->Unref();
- }
-
- bool completed = false;
- int ready_size = ready.size();
- if (ready_size == 0 || !s.ok()) {
- completed = (num_active_.fetch_sub(1) == 1);
- } else if (ready_size > 1) {
- num_active_.fetch_add(ready_size - 1, std::memory_order_relaxed);
- }
-
- // Schedule the ready nodes in 'ready'.
- if (s.ok()) {
- ScheduleReady(ready, inline_ready);
- }
- return completed;
-}
-
-void SimpleExecutorState::Finish() {
- mu_.lock();
- auto ret = status_;
- auto done_cb = done_cb_;
- auto runner = runner_;
- mu_.unlock();
- delete this;
- CHECK(done_cb != nullptr);
- runner([done_cb, ret]() { done_cb(ret); });
-}
-
-Status SimpleExecutorState::ProcessOutputs(const NodeItem& item,
- OpKernelContext* ctx,
- ReadyNodeIds* ready,
- NodeExecStats* stats) {
- Status s = ctx->status();
- if (!s.ok()) {
- s = AttachDef(s, item.kernel->def());
- LOG(WARNING) << this << " Compute status: " << s;
- return s;
- }
-
- // Processes outputs.
- gtl::InlinedVector<Entry, 4> outputs;
- const Node* node = item.node;
- outputs.resize(node->num_outputs());
-
- // Get the device_context for this node id, if it exists.
- DeviceContext* device_context = nullptr;
- auto dc_it = device_context_map_.find(node->id());
- if (dc_it != device_context_map_.end()) {
- device_context = dc_it->second;
- }
-
- for (int i = 0; i < node->num_outputs(); ++i) {
- TensorValue val = ctx->release_output(i);
- // Sanity check of output tensor types.
- DataType dtype = val->dtype();
- if (val.is_ref()) dtype = MakeRefType(dtype);
- if (dtype == node->output_type(i)) {
- Entry* out = &(outputs[i]);
- if (val.is_ref()) {
- out->ref = val.tensor;
- out->ref_mu = val.mutex_if_ref;
- } else {
- out->val = *val.tensor;
- }
-
- // Set the device context of the output entry.
- out->device_context = device_context;
-
- // Set the allocator attributes of the output entry.
- out->alloc_attr = ctx->output_alloc_attr(i);
-
- if (stats_collector_ && val.tensor->IsInitialized()) {
- nodestats::SetOutput(stats, i, ctx->output_allocation_type(i),
- val.tensor);
- }
- } else {
- s.Update(
- errors::Internal("Output ", i, " of type ", DataTypeString(dtype),
- " does not match declared output type ",
- DataTypeString(node->output_type(i)),
- " for operation ", SummarizeNodeDef(node->def())));
- }
- if (!val.is_ref()) {
- // If OpKernelContext returns outputs via pass-by-value, we
- // don't need this trouble.
- delete val.tensor;
- }
- }
- if (!s.ok()) return s;
-
- // Clears inputs.
- for (int i = 0; i < node->num_inputs(); ++i) {
- input_tensors_[item.input_start + i].val = *kEmptyTensor;
- }
-
- // Propagates outputs along out edges.
- ready->clear();
- const std::vector<NodeItem>& nodes = impl_->nodes_;
- for (const Edge* e : node->out_edges()) {
- const int src_slot = e->src_output();
- const int dst_id = e->dst()->id();
- const NodeItem& dst_item = nodes[dst_id];
- if (!e->IsControlEdge()) {
- const int dst_slot = e->dst_input();
- input_tensors_[dst_item.input_start + dst_slot] = outputs[src_slot];
- }
- if (pending_[dst_id].Dec()) {
- ready->push_back(dst_id);
- }
- }
- return Status::OK();
-}
-
-// NOTE(yuanbyu): Use the executor that supports control flow by default.
-const bool use_control_flow_executor = true;
void ExecutorImpl::RunAsync(const Args& args, DoneCallback done) {
- if (params_.has_control_flow || use_control_flow_executor) {
- (new ExecutorState(args, this))->RunAsync(done);
- } else {
- (new SimpleExecutorState(args, this))->RunAsync(done);
- }
+ (new ExecutorState(args, this))->RunAsync(done);
}
} // end namespace
diff --git a/tensorflow/core/common_runtime/executor.h b/tensorflow/core/common_runtime/executor.h
index 00625fce60..8d98c7a11e 100644
--- a/tensorflow/core/common_runtime/executor.h
+++ b/tensorflow/core/common_runtime/executor.h
@@ -114,9 +114,6 @@ struct LocalExecutorParams {
// The library runtime support.
FunctionLibraryRuntime* function_library;
- // True iff the computation contains control flow nodes.
- bool has_control_flow;
-
// create_kernel returns an instance of op kernel based on NodeDef.
// delete_kernel is called for every kernel used by the executor
// when the executor is deleted.
diff --git a/tensorflow/core/common_runtime/function.cc b/tensorflow/core/common_runtime/function.cc
index 670774b67e..e086098ff5 100644
--- a/tensorflow/core/common_runtime/function.cc
+++ b/tensorflow/core/common_runtime/function.cc
@@ -551,7 +551,6 @@ Status FunctionLibraryRuntimeImpl::CreateItem(Handle handle, Item** item) {
LocalExecutorParams params;
params.device = device_;
params.function_library = this;
- params.has_control_flow = false;
params.create_kernel = create_kernel_;
params.delete_kernel = [](OpKernel* kernel) {
DeleteNonCachedKernel(kernel);
diff --git a/tensorflow/core/common_runtime/kernel_benchmark_testlib.cc b/tensorflow/core/common_runtime/kernel_benchmark_testlib.cc
index 2aec2168c9..e898f7ec84 100644
--- a/tensorflow/core/common_runtime/kernel_benchmark_testlib.cc
+++ b/tensorflow/core/common_runtime/kernel_benchmark_testlib.cc
@@ -70,18 +70,21 @@ Benchmark::Benchmark(const string& device, Graph* g,
const int graph_def_version = g->version();
+ LocalExecutorParams params;
+ params.device = device_;
+ params.function_library = nullptr;
+ params.create_kernel = [this, graph_def_version](const NodeDef& ndef,
+ OpKernel** kernel) {
+ return CreateNonCachedKernel(device_, nullptr, ndef, graph_def_version,
+ kernel);
+ };
+ params.delete_kernel = [](OpKernel* kernel) {
+ DeleteNonCachedKernel(kernel);
+ };
+
if (init) {
Executor* init_exec;
- TF_CHECK_OK(NewLocalExecutor(
- {
- device_, nullptr, false,
- [this, graph_def_version](const NodeDef& ndef, OpKernel** kernel) {
- return CreateNonCachedKernel(device_, nullptr, ndef,
- graph_def_version, kernel);
- },
- [](OpKernel* kernel) { DeleteNonCachedKernel(kernel); },
- },
- init, &init_exec));
+ TF_CHECK_OK(NewLocalExecutor(params, init, &init_exec));
Executor::Args args;
args.rendezvous = rendez_;
args.runner = runner;
@@ -89,16 +92,7 @@ Benchmark::Benchmark(const string& device, Graph* g,
delete init_exec;
}
- TF_CHECK_OK(NewLocalExecutor(
- {
- device_, nullptr, false,
- [this, graph_def_version](const NodeDef& ndef, OpKernel** kernel) {
- return CreateNonCachedKernel(device_, nullptr, ndef,
- graph_def_version, kernel);
- },
- [](OpKernel* kernel) { DeleteNonCachedKernel(kernel); },
- },
- g, &exec_));
+ TF_CHECK_OK(NewLocalExecutor(params, g, &exec_));
}
Benchmark::~Benchmark() {