diff options
author | 2016-01-11 11:37:37 -0800 | |
---|---|---|
committer | 2016-01-11 11:37:37 -0800 | |
commit | 96aa23e45e72bd765920cc957090ff453e25cf64 (patch) | |
tree | f4c15c0ae3f5bb139329c2817d8b5cbb376115e5 | |
parent | 681db2caf1fd48dfc85500817cd40cfde78dc67a (diff) |
TensorFlow: remove simple executor state and has_control_flow flag everywhere.
Change: 111871361
-rw-r--r-- | tensorflow/core/common_runtime/direct_session.cc | 12 | ||||
-rw-r--r-- | tensorflow/core/common_runtime/executor.cc | 518 | ||||
-rw-r--r-- | tensorflow/core/common_runtime/executor.h | 3 | ||||
-rw-r--r-- | tensorflow/core/common_runtime/function.cc | 1 | ||||
-rw-r--r-- | tensorflow/core/common_runtime/kernel_benchmark_testlib.cc | 34 |
5 files changed, 15 insertions, 553 deletions
diff --git a/tensorflow/core/common_runtime/direct_session.cc b/tensorflow/core/common_runtime/direct_session.cc index 5e57735fea..2bad722cc1 100644 --- a/tensorflow/core/common_runtime/direct_session.cc +++ b/tensorflow/core/common_runtime/direct_session.cc @@ -350,17 +350,6 @@ Status DirectSession::GetOrCreateExecutors( return s; } - bool has_control_flow = false; - for (const auto& graph : graphs) { - for (const Node* n : graph.second->nodes()) { - if (IsControlFlow(n)) { - has_control_flow = true; - break; - } - } - if (has_control_flow) break; - } - std::unique_ptr<ExecutorsAndKeys> ek(new ExecutorsAndKeys); ek->func_defs = fdefs; ek->items.reserve(graphs.size()); @@ -382,7 +371,6 @@ Status DirectSession::GetOrCreateExecutors( NewFunctionLibraryRuntime(device, runner, graph_def_version, fdefs); LocalExecutorParams params; - params.has_control_flow = has_control_flow; params.device = device; params.function_library = item->flib; auto lib = item->flib; diff --git a/tensorflow/core/common_runtime/executor.cc b/tensorflow/core/common_runtime/executor.cc index 0537294ceb..e42d2e260f 100644 --- a/tensorflow/core/common_runtime/executor.cc +++ b/tensorflow/core/common_runtime/executor.cc @@ -227,7 +227,6 @@ class ExecutorImpl : public Executor { private: friend class ExecutorState; - friend class SimpleExecutorState; // Owned. LocalExecutorParams params_; @@ -275,9 +274,6 @@ Status ExecutorImpl::Initialize() { ++frame_input_count_[frame_name]; } } - if (params_.has_control_flow) { - VLOG(2) << "Graph has control flow."; - } if (!s.ok()) return s; return SetAllocAttrs(); } @@ -1612,520 +1608,8 @@ void ExecutorState::CleanupFramesIterations(FrameState* frame, int64 iter, } } -// When ExecutorImpl graph has no control flow nodes, -// SimpleExecutorState is used instead of ExecutorState. It maintains -// fewer internal state and is convenient for experimenting with async -// op kernels. -class SimpleExecutorState { - public: - SimpleExecutorState(const Executor::Args& args, ExecutorImpl* impl); - ~SimpleExecutorState() { - for (auto it : device_context_map_) { - it.second->Unref(); - } - delete slice_reader_cache_; - } - void RunAsync(Executor::DoneCallback done); - - private: - typedef SimpleExecutorState ME; - - // Not owned. - Rendezvous* rendezvous_; - StepStatsCollector* stats_collector_; - checkpoint::TensorSliceReaderCacheWrapper* slice_reader_cache_; - FunctionCallFrame* call_frame_; - const ExecutorImpl* impl_; - CancellationManager* cancellation_manager_; - Executor::Args::Runner runner_; - - // Owned. - - // i-th node's j-th input is in tensors_[impl_->nodes[i].input_start - // + j]. The output is either a tensor pointer (pass-by-reference) - // or a tensor (pass-by-value). - // - // NOTE: Not protected by mu_ because tensors_ is resized once. Each - // element of tensors_ is written once by the source node of an edge - // and is cleared by the destination of the same edge. The latter - // node is never run concurrently with the former node. - struct Entry { - Tensor val = *kEmptyTensor; // A tensor value. - Tensor* ref = nullptr; // A tensor reference. - mutex* ref_mu = nullptr; // mutex for *ref if ref is not nullptr. - - // Every entry carries an optional DeviceContext containing - // Device-specific information about how the Tensor was produced. - DeviceContext* device_context = nullptr; - - // The attributes of the allocator that creates the tensor. - AllocatorAttributes alloc_attr; - }; - - // Contains a map from node id to the DeviceContext object that was - // assigned by the device at the beginning of a step. - DeviceContextMap device_context_map_; - - std::vector<Entry> input_tensors_; - - // Step-local resource manager. - ResourceMgr step_resource_manager_; - - // Invoked when the execution finishes. - Executor::DoneCallback done_cb_; - - // How many active threads of computation are being used. Same as - // the number of pending Process() functions. - std::atomic_int_fast32_t num_active_; - - mutex mu_; - Status status_ GUARDED_BY(mu_); - - // i-th kernel is still waiting for pending[i] inputs. - class CountDown { - public: - CountDown() : v_(0) {} - void Set(int32 v) { v_.store(v); } - bool Dec() { - return v_.load(std::memory_order_acquire) == 1 || v_.fetch_sub(1) == 1; - } - - private: - std::atomic_int_fast32_t v_; - }; - std::vector<CountDown> pending_; - - // Process Node identified by "id" in current thread. "scheduled_usec" - // indicates when the node becomes ready and gets scheduled. - void Process(int id, int64 scheduled_usec); - - // Before invoking item->kernel, fills in its "inputs". - Status PrepareInputs(const NodeItem& item, TensorValueVec* inputs, - DeviceContextVec* input_device_contexts); - - // After item->kernel computation is done, processes its outputs - // and returns nodes that become "ready". - typedef gtl::InlinedVector<int, 8> ReadyNodeIds; - Status ProcessOutputs(const NodeItem& item, OpKernelContext* ctx, - ReadyNodeIds* ready, NodeExecStats* stats); - - // "node" just finishes. Takes ownership of "stats". Returns true if - // execution has completed. - bool NodeDone(const Status& s, const Node* node, const ReadyNodeIds& ready, - NodeExecStats* stats, std::deque<int>* inline_ready); - - // Call Process() on all nodes in 'inline_ready'. - void ProcessInline(const std::deque<int>& inline_ready); - - // Schedule all the expensive nodes in 'ready', and put all the inexpensive - // nodes in 'ready' into 'inline_ready'. - void ScheduleReady(const ReadyNodeIds& ready, std::deque<int>* inline_ready); - - // One thread of control finishes. - void Finish(); - - TF_DISALLOW_COPY_AND_ASSIGN(SimpleExecutorState); -}; - -SimpleExecutorState::SimpleExecutorState(const Executor::Args& args, - ExecutorImpl* impl) - : rendezvous_(args.rendezvous), - stats_collector_(args.stats_collector), - slice_reader_cache_(new checkpoint::TensorSliceReaderCacheWrapper), - call_frame_(args.call_frame), - impl_(impl), - cancellation_manager_(args.cancellation_manager), - runner_(args.runner), - num_active_(0), - pending_(impl_->nodes_.size()) {} - -void SimpleExecutorState::ProcessInline(const std::deque<int>& inline_ready) { - if (inline_ready.empty()) return; - int64 scheduled_usec = 0; - if (stats_collector_) { - scheduled_usec = nodestats::NowInUsec(); - } - for (int id : inline_ready) { - Process(id, scheduled_usec); - } -} - -void SimpleExecutorState::ScheduleReady(const ReadyNodeIds& ready, - std::deque<int>* inline_ready) { - if (ready.empty()) return; - - int64 scheduled_usec = 0; - if (stats_collector_) { - scheduled_usec = nodestats::NowInUsec(); - } - if (inline_ready == nullptr) { - // Schedule to run all the ready ops in thread pool. - for (auto id : ready) { - runner_(std::bind(&ME::Process, this, id, scheduled_usec)); - } - return; - } - const std::vector<NodeItem>& nodes = impl_->nodes_; - int curr_expensive_node = -1; - for (auto id : ready) { - if (!nodes[id].kernel->IsExpensive()) { - // Inline this inexpensive node. - inline_ready->push_back(id); - } else { - if (curr_expensive_node != -1) { - // Dispatch to another thread since there is plenty of work to - // do for this thread. - runner_( - std::bind(&ME::Process, this, curr_expensive_node, scheduled_usec)); - } - curr_expensive_node = id; - } - } - if (curr_expensive_node != -1) { - if (inline_ready->empty()) { - // Tail recursion optimization - inline_ready->push_back(curr_expensive_node); - } else { - // There are inline nodes to run already. We dispatch this expensive - // node to other thread. - runner_( - std::bind(&ME::Process, this, curr_expensive_node, scheduled_usec)); - } - } -} - -void SimpleExecutorState::RunAsync(Executor::DoneCallback done) { - const Graph* graph = impl_->graph_; - ReadyNodeIds ready; - - // Ask the device to fill in the device context map. - Device* device = impl_->params_.device; - device->FillContextMap(graph, &device_context_map_); - - for (const Node* n : graph->nodes()) { - const int id = n->id(); - const int num_in_edges = n->in_edges().size(); - pending_[id].Set(num_in_edges); - if (num_in_edges == 0) { - ready.push_back(id); - } - } - if (ready.empty()) { - done(Status::OK()); - } else { - num_active_ = ready.size(); - done_cb_ = done; - input_tensors_.resize(impl_->total_tensors_); - // Schedule to run all the ready ops in thread pool. - ScheduleReady(ready, nullptr); - } -} - -Status SimpleExecutorState::PrepareInputs( - const NodeItem& item, TensorValueVec* inputs, - DeviceContextVec* input_device_contexts) { - const Node* node = item.node; - - inputs->clear(); - inputs->resize(node->num_inputs()); - input_device_contexts->clear(); - input_device_contexts->resize(node->num_inputs()); - - for (int i = 0; i < node->num_inputs(); ++i) { - const bool expect_ref = IsRefType(node->input_type(i)); - Entry* entry = input_tensors_.data() + item.input_start + i; - (*input_device_contexts)[i] = entry->device_context; - - // i-th input. - TensorValue* inp = &(*inputs)[i]; - - if (entry->ref == nullptr) { - if (expect_ref) { - return AttachDef( - errors::InvalidArgument(i, "-th input expects a ref type"), - item.kernel->def()); - } - inp->tensor = &entry->val; - } else { - if (!entry->ref->IsInitialized() && !IsInitializationOp(item.node)) { - return AttachDef( - errors::FailedPrecondition("Attempting to use uninitialized value ", - item.kernel->def().input(i)), - item.kernel->def()); - } - if (expect_ref) { - inp->mutex_if_ref = entry->ref_mu; - inp->tensor = entry->ref; - } else { - // Automatically deref the tensor ref when the op expects a - // tensor but is given a ref to a tensor. Need to deref it - // under the mutex. - { - mutex_lock l(*(entry->ref_mu)); - entry->val = *entry->ref; - } - inp->tensor = &entry->val; - } - } - } - return Status::OK(); -} - -void SimpleExecutorState::Process(int id, int64 scheduled_usec) { - const std::vector<NodeItem>& nodes = impl_->nodes_; - ReadyNodeIds ready; - std::deque<int> inline_ready; - - // Parameters passed to OpKernel::Compute. - TensorValueVec inputs; - DeviceContextVec input_device_contexts; - - OpKernelContext::Params params; - Device* device = impl_->params_.device; - params.device = device; - // track allocations if and only if we are collecting statistics - params.track_allocations = (stats_collector_ != nullptr); - params.rendezvous = rendezvous_; - params.cancellation_manager = cancellation_manager_; - params.call_frame = call_frame_; - params.function_library = impl_->params_.function_library; - params.resource_manager = device->resource_manager(); - params.step_resource_manager = &step_resource_manager_; - params.slice_reader_cache = slice_reader_cache_; - params.inputs = &inputs; - params.input_device_contexts = &input_device_contexts; - params.frame_iter = FrameAndIter(0, 0); - - Status s; - NodeExecStats* stats = nullptr; - bool completed = false; - inline_ready.push_back(id); - while (!inline_ready.empty()) { - id = inline_ready.front(); - inline_ready.pop_front(); - const NodeItem& item = nodes[id]; - const Node* node = item.node; - - // Set the device_context for this node id, if it exists. - auto dc_it = device_context_map_.find(id); - if (dc_it != device_context_map_.end()) { - params.op_device_context = dc_it->second; - } - - if (stats_collector_) { - stats = new NodeExecStats; - stats->set_node_name(node->name()); - nodestats::SetScheduled(stats, scheduled_usec); - nodestats::SetAllStart(stats); - } - - VLOG(1) << "Process node: " << id << " " << SummarizeNodeDef(node->def()); - - // Prepares inputs. - s = PrepareInputs(item, &inputs, &input_device_contexts); - if (!s.ok()) { - // Continue to process the nodes in 'inline_ready'. - completed = NodeDone(s, item.node, ready, stats, &inline_ready); - continue; - } - - OpKernel* op_kernel = item.kernel; - params.op_kernel = op_kernel; - params.output_alloc_attr = [this, node, op_kernel](int index) { - return OutputAttributes(&impl_->alloc_attr_, node, op_kernel, index); - }; - - // Asynchronous computes. - AsyncOpKernel* async = op_kernel->AsAsync(); - if (async) { - auto pcopy = CopyParams(params); - auto ctx = new OpKernelContext(*pcopy); - auto done = [this, item, ctx, stats, pcopy]() { - VLOG(2) << this - << " Async kernel done: " << SummarizeNodeDef(item.node->def()); - if (stats_collector_) nodestats::SetOpEnd(stats); - ReadyNodeIds ready; - Status s = ProcessOutputs(item, ctx, &ready, stats); - if (stats_collector_) nodestats::SetMemory(stats, ctx); - // Schedule to run all the ready ops in thread pool. - bool completed = NodeDone(s, item.node, ready, stats, nullptr); - delete ctx; - DeleteParams(pcopy); - if (completed) Finish(); - }; - if (stats_collector_) nodestats::SetOpStart(stats); - device->ComputeAsync(async, ctx, done); - } else { - // Synchronous computes. - OpKernelContext ctx(params); - if (stats_collector_) nodestats::SetOpStart(stats); - device->Compute(CHECK_NOTNULL(op_kernel), &ctx); - if (stats_collector_) nodestats::SetOpEnd(stats); - - s = ProcessOutputs(item, &ctx, &ready, stats); - if (stats_collector_) nodestats::SetMemory(stats, &ctx); - if (stats_collector_) { - scheduled_usec = nodestats::NowInUsec(); - } - completed = NodeDone(s, node, ready, stats, &inline_ready); - } - } // while !inline_ready.empty() - - // This thread of computation is done if completed = true. - if (completed) Finish(); -} - -bool SimpleExecutorState::NodeDone(const Status& s, const Node* node, - const ReadyNodeIds& ready, - NodeExecStats* stats, - std::deque<int>* inline_ready) { - if (stats_collector_) { - nodestats::SetAllEnd(stats); - if (!SetTimelineLabel(node, stats)) { - // Only record non-transfer nodes. - stats_collector_->Save(impl_->params_.device->name(), stats); - } else { - delete stats; - } - } - - Rendezvous* captured_rendezvous = nullptr; // Will be set on error. - if (!s.ok()) { - // Some error happened. This thread of computation is done. - mutex_lock l(mu_); - if (status_.ok()) { - captured_rendezvous = rendezvous_; - if (captured_rendezvous) captured_rendezvous->Ref(); - status_ = s; - } - } - if (captured_rendezvous) { - // If we captured the rendezvous_ pointer, we are in an error condition. - // Use captured_rendezvous, in case "this" is deleted by another thread. - TRACEPRINTF("StartAbort: %s", s.ToString().c_str()); - captured_rendezvous->StartAbort(s); - captured_rendezvous->Unref(); - } - - bool completed = false; - int ready_size = ready.size(); - if (ready_size == 0 || !s.ok()) { - completed = (num_active_.fetch_sub(1) == 1); - } else if (ready_size > 1) { - num_active_.fetch_add(ready_size - 1, std::memory_order_relaxed); - } - - // Schedule the ready nodes in 'ready'. - if (s.ok()) { - ScheduleReady(ready, inline_ready); - } - return completed; -} - -void SimpleExecutorState::Finish() { - mu_.lock(); - auto ret = status_; - auto done_cb = done_cb_; - auto runner = runner_; - mu_.unlock(); - delete this; - CHECK(done_cb != nullptr); - runner([done_cb, ret]() { done_cb(ret); }); -} - -Status SimpleExecutorState::ProcessOutputs(const NodeItem& item, - OpKernelContext* ctx, - ReadyNodeIds* ready, - NodeExecStats* stats) { - Status s = ctx->status(); - if (!s.ok()) { - s = AttachDef(s, item.kernel->def()); - LOG(WARNING) << this << " Compute status: " << s; - return s; - } - - // Processes outputs. - gtl::InlinedVector<Entry, 4> outputs; - const Node* node = item.node; - outputs.resize(node->num_outputs()); - - // Get the device_context for this node id, if it exists. - DeviceContext* device_context = nullptr; - auto dc_it = device_context_map_.find(node->id()); - if (dc_it != device_context_map_.end()) { - device_context = dc_it->second; - } - - for (int i = 0; i < node->num_outputs(); ++i) { - TensorValue val = ctx->release_output(i); - // Sanity check of output tensor types. - DataType dtype = val->dtype(); - if (val.is_ref()) dtype = MakeRefType(dtype); - if (dtype == node->output_type(i)) { - Entry* out = &(outputs[i]); - if (val.is_ref()) { - out->ref = val.tensor; - out->ref_mu = val.mutex_if_ref; - } else { - out->val = *val.tensor; - } - - // Set the device context of the output entry. - out->device_context = device_context; - - // Set the allocator attributes of the output entry. - out->alloc_attr = ctx->output_alloc_attr(i); - - if (stats_collector_ && val.tensor->IsInitialized()) { - nodestats::SetOutput(stats, i, ctx->output_allocation_type(i), - val.tensor); - } - } else { - s.Update( - errors::Internal("Output ", i, " of type ", DataTypeString(dtype), - " does not match declared output type ", - DataTypeString(node->output_type(i)), - " for operation ", SummarizeNodeDef(node->def()))); - } - if (!val.is_ref()) { - // If OpKernelContext returns outputs via pass-by-value, we - // don't need this trouble. - delete val.tensor; - } - } - if (!s.ok()) return s; - - // Clears inputs. - for (int i = 0; i < node->num_inputs(); ++i) { - input_tensors_[item.input_start + i].val = *kEmptyTensor; - } - - // Propagates outputs along out edges. - ready->clear(); - const std::vector<NodeItem>& nodes = impl_->nodes_; - for (const Edge* e : node->out_edges()) { - const int src_slot = e->src_output(); - const int dst_id = e->dst()->id(); - const NodeItem& dst_item = nodes[dst_id]; - if (!e->IsControlEdge()) { - const int dst_slot = e->dst_input(); - input_tensors_[dst_item.input_start + dst_slot] = outputs[src_slot]; - } - if (pending_[dst_id].Dec()) { - ready->push_back(dst_id); - } - } - return Status::OK(); -} - -// NOTE(yuanbyu): Use the executor that supports control flow by default. -const bool use_control_flow_executor = true; void ExecutorImpl::RunAsync(const Args& args, DoneCallback done) { - if (params_.has_control_flow || use_control_flow_executor) { - (new ExecutorState(args, this))->RunAsync(done); - } else { - (new SimpleExecutorState(args, this))->RunAsync(done); - } + (new ExecutorState(args, this))->RunAsync(done); } } // end namespace diff --git a/tensorflow/core/common_runtime/executor.h b/tensorflow/core/common_runtime/executor.h index 00625fce60..8d98c7a11e 100644 --- a/tensorflow/core/common_runtime/executor.h +++ b/tensorflow/core/common_runtime/executor.h @@ -114,9 +114,6 @@ struct LocalExecutorParams { // The library runtime support. FunctionLibraryRuntime* function_library; - // True iff the computation contains control flow nodes. - bool has_control_flow; - // create_kernel returns an instance of op kernel based on NodeDef. // delete_kernel is called for every kernel used by the executor // when the executor is deleted. diff --git a/tensorflow/core/common_runtime/function.cc b/tensorflow/core/common_runtime/function.cc index 670774b67e..e086098ff5 100644 --- a/tensorflow/core/common_runtime/function.cc +++ b/tensorflow/core/common_runtime/function.cc @@ -551,7 +551,6 @@ Status FunctionLibraryRuntimeImpl::CreateItem(Handle handle, Item** item) { LocalExecutorParams params; params.device = device_; params.function_library = this; - params.has_control_flow = false; params.create_kernel = create_kernel_; params.delete_kernel = [](OpKernel* kernel) { DeleteNonCachedKernel(kernel); diff --git a/tensorflow/core/common_runtime/kernel_benchmark_testlib.cc b/tensorflow/core/common_runtime/kernel_benchmark_testlib.cc index 2aec2168c9..e898f7ec84 100644 --- a/tensorflow/core/common_runtime/kernel_benchmark_testlib.cc +++ b/tensorflow/core/common_runtime/kernel_benchmark_testlib.cc @@ -70,18 +70,21 @@ Benchmark::Benchmark(const string& device, Graph* g, const int graph_def_version = g->version(); + LocalExecutorParams params; + params.device = device_; + params.function_library = nullptr; + params.create_kernel = [this, graph_def_version](const NodeDef& ndef, + OpKernel** kernel) { + return CreateNonCachedKernel(device_, nullptr, ndef, graph_def_version, + kernel); + }; + params.delete_kernel = [](OpKernel* kernel) { + DeleteNonCachedKernel(kernel); + }; + if (init) { Executor* init_exec; - TF_CHECK_OK(NewLocalExecutor( - { - device_, nullptr, false, - [this, graph_def_version](const NodeDef& ndef, OpKernel** kernel) { - return CreateNonCachedKernel(device_, nullptr, ndef, - graph_def_version, kernel); - }, - [](OpKernel* kernel) { DeleteNonCachedKernel(kernel); }, - }, - init, &init_exec)); + TF_CHECK_OK(NewLocalExecutor(params, init, &init_exec)); Executor::Args args; args.rendezvous = rendez_; args.runner = runner; @@ -89,16 +92,7 @@ Benchmark::Benchmark(const string& device, Graph* g, delete init_exec; } - TF_CHECK_OK(NewLocalExecutor( - { - device_, nullptr, false, - [this, graph_def_version](const NodeDef& ndef, OpKernel** kernel) { - return CreateNonCachedKernel(device_, nullptr, ndef, - graph_def_version, kernel); - }, - [](OpKernel* kernel) { DeleteNonCachedKernel(kernel); }, - }, - g, &exec_)); + TF_CHECK_OK(NewLocalExecutor(params, g, &exec_)); } Benchmark::~Benchmark() { |