From 07ee0c74226d641419e7ef4543817fb400a8b146 Mon Sep 17 00:00:00 2001 From: Derek Murray Date: Thu, 18 Jan 2018 12:18:49 -0800 Subject: [tf.data] Further simplify the `CapturedFunction::Run*()` interfaces. All users of these interfaces had to use the same boilerplate to create a `FunctionLibraryRuntime::Options`. This change moves that boilerplate inside the `CapturedFunction` implementation. PiperOrigin-RevId: 182418501 --- tensorflow/core/kernels/data/captured_function.cc | 186 +++++++++++---------- tensorflow/core/kernels/data/captured_function.h | 17 +- tensorflow/core/kernels/data/dataset_utils.cc | 17 +- tensorflow/core/kernels/data/filter_dataset_op.cc | 24 +-- .../kernels/data/group_by_window_dataset_op.cc | 43 +---- .../core/kernels/data/map_and_batch_dataset_op.cc | 30 +--- tensorflow/core/kernels/data/map_dataset_op.cc | 13 +- .../core/kernels/data/parallel_map_dataset_op.cc | 20 +-- tensorflow/core/kernels/data/scan_dataset_op.cc | 10 +- 9 files changed, 127 insertions(+), 233 deletions(-) diff --git a/tensorflow/core/kernels/data/captured_function.cc b/tensorflow/core/kernels/data/captured_function.cc index c50ac91c83..1f6d32f8df 100644 --- a/tensorflow/core/kernels/data/captured_function.cc +++ b/tensorflow/core/kernels/data/captured_function.cc @@ -35,20 +35,6 @@ Status CapturedFunction::Create( CapturedFunction::~CapturedFunction() {} -Status CapturedFunction::set_lib(FunctionLibraryRuntime* lib) { - mutex_lock l(mu_); - if (lib_ == nullptr) { - lib_ = lib; - return Status::OK(); - } - if (lib != lib_) { - return errors::Internal( - "Captured function was called with a different " - "FunctionLibraryRuntime*, which is not permitted."); - } - return Status::OK(); -} - namespace { class CallFrameBase : public CallFrameInterface { public: @@ -170,99 +156,129 @@ class BorrowedArgsCallFrame : public CallFrameBase { } // namespace Status CapturedFunction::MaybeInstantiate( - FunctionLibraryRuntime* lib, - FunctionLibraryRuntime::InstantiateOptions inst_opts) { - TF_RETURN_IF_ERROR(set_lib(lib)); - inst_opts.state_handle = std::to_string(random::New64()); + IteratorContext* ctx, FunctionLibraryRuntime::Handle* out_handle) { mutex_lock l(mu_); - if (f_handle_ == kInvalidHandle) { + if (lib_ == nullptr) { + // The context's runtime will be used for all subsequent calls. + lib_ = ctx->lib(); + DCHECK(f_handle_ == kInvalidHandle); + FunctionLibraryRuntime::InstantiateOptions inst_opts; + inst_opts.overlay_lib = ctx->function_library().get(); + inst_opts.state_handle = std::to_string(random::New64()); TF_RETURN_IF_ERROR(lib_->Instantiate(func_.name(), AttrSlice(&func_.attr()), inst_opts, &f_handle_)); + const FunctionBody* fbody = lib_->GetFunctionBody(f_handle_); + if (fbody == nullptr) { + return errors::Internal("Failed to instantiate function body."); + } + ret_types_ = fbody->ret_types; + } else { + // TODO(mrry): Consider moving this under a shared lock, as it is + // the common case. + if (ctx->lib() != lib_) { + return errors::Internal( + "Captured function was called with a different " + "FunctionLibraryRuntime*, which is not permitted."); + } } - const FunctionBody* fbody = lib_->GetFunctionBody(f_handle_); - if (fbody == nullptr) { - return errors::Internal("Failed to instantiate function body."); - } - ret_types_ = fbody->ret_types; + *out_handle = f_handle_; return Status::OK(); } Status CapturedFunction::Run(IteratorContext* ctx, - FunctionLibraryRuntime::Options f_opts, std::vector&& args, std::vector* rets) { - FunctionLibraryRuntime::InstantiateOptions inst_opts; - inst_opts.overlay_lib = ctx->function_library().get(); - TF_RETURN_IF_ERROR(MaybeInstantiate(ctx->lib(), inst_opts)); + FunctionLibraryRuntime::Handle handle; + TF_RETURN_IF_ERROR(MaybeInstantiate(ctx, &handle)); + + FunctionLibraryRuntime::Options f_opts; + f_opts.step_id = CapturedFunction::generate_step_id(); + ScopedStepContainer step_container(f_opts.step_id, [ctx](const string& name) { + ctx->lib()->device()->resource_manager()->Cleanup(name).IgnoreError(); + }); + f_opts.step_container = &step_container; + f_opts.runner = ctx->runner(); // TODO(mrry): Add cancellation manager support to IteratorContext // so that we can cancel running map functions. The local // cancellation manager here is created so that we can run kernels // (such as queue kernels) that depend on the non-nullness of // `OpKernelContext::cancellation_manager()`, but additional effort // will be required to plumb it through the `IteratorContext`. - auto c_mgr = new CancellationManager; - auto frame = - new OwnedArgsCallFrame(std::move(args), &captured_inputs_, ret_types_); - f_opts.cancellation_manager = c_mgr; + CancellationManager c_mgr; + f_opts.cancellation_manager = &c_mgr; + + OwnedArgsCallFrame frame(std::move(args), &captured_inputs_, ret_types_); Notification n; Status s; - mutex_lock l(mu_); - lib_->Run(f_opts, f_handle_, frame, - [rets, c_mgr, frame, &n, &s](Status func_status) { - delete c_mgr; - s.Update(func_status); - if (s.ok()) { - s = frame->ConsumeRetvals(rets); - } - delete frame; - n.Notify(); - }); + ctx->lib()->Run(f_opts, handle, &frame, [&n, &s](Status func_status) { + s.Update(func_status); + n.Notify(); + }); n.WaitForNotification(); - return s; + TF_RETURN_IF_ERROR(s); + return frame.ConsumeRetvals(rets); } -Status CapturedFunction::RunWithBorrowedArgs( - IteratorContext* ctx, FunctionLibraryRuntime::Options f_opts, - const std::vector& args, std::vector* rets) { - FunctionLibraryRuntime::InstantiateOptions inst_opts; - inst_opts.overlay_lib = ctx->function_library().get(); - TF_RETURN_IF_ERROR(MaybeInstantiate(ctx->lib(), inst_opts)); +Status CapturedFunction::RunWithBorrowedArgs(IteratorContext* ctx, + const std::vector& args, + std::vector* rets) { + FunctionLibraryRuntime::Handle handle; + TF_RETURN_IF_ERROR(MaybeInstantiate(ctx, &handle)); + + FunctionLibraryRuntime::Options f_opts; + f_opts.step_id = CapturedFunction::generate_step_id(); + ScopedStepContainer step_container(f_opts.step_id, [ctx](const string& name) { + ctx->lib()->device()->resource_manager()->Cleanup(name).IgnoreError(); + }); + f_opts.step_container = &step_container; + f_opts.runner = ctx->runner(); // TODO(mrry): Add cancellation manager support to IteratorContext // so that we can cancel running map functions. The local // cancellation manager here is created so that we can run kernels // (such as queue kernels) that depend on the non-nullness of // `OpKernelContext::cancellation_manager()`, but additional effort // will be required to plumb it through the `IteratorContext`. - auto c_mgr = new CancellationManager; + CancellationManager c_mgr; + f_opts.cancellation_manager = &c_mgr; + BorrowedArgsCallFrame frame(args, &captured_inputs_, ret_types_); - f_opts.cancellation_manager = c_mgr; Notification n; Status s; - mutex_lock l(mu_); - lib_->Run(f_opts, f_handle_, &frame, - [rets, c_mgr, &frame, &n, &s](Status func_status) { - delete c_mgr; - s.Update(func_status); - if (s.ok()) { - s = frame.ConsumeRetvals(rets); - } - n.Notify(); - }); + ctx->lib()->Run(f_opts, handle, &frame, [&n, &s](Status func_status) { + s.Update(func_status); + n.Notify(); + }); n.WaitForNotification(); - return s; + TF_RETURN_IF_ERROR(s); + return frame.ConsumeRetvals(rets); } -void CapturedFunction::RunAsync( - FunctionLibraryRuntime* lib, - FunctionLibraryRuntime::InstantiateOptions inst_opts, - FunctionLibraryRuntime::Options f_opts, std::vector&& args, - std::vector* rets, FunctionLibraryRuntime::DoneCallback done) { - Status s = MaybeInstantiate(lib, inst_opts); +void CapturedFunction::RunAsync(IteratorContext* ctx, + std::vector&& args, + std::vector* rets, + FunctionLibraryRuntime::DoneCallback done) { + // NOTE(mrry): This method does not transfer ownership of `ctx`, and it may + // be deleted before `done` is called. Take care not to capture `ctx` in any + // code that may execute asynchronously in this function. + FunctionLibraryRuntime::Handle handle; + Status s = MaybeInstantiate(ctx, &handle); if (!s.ok()) { done(s); return; } + auto frame = + new OwnedArgsCallFrame(std::move(args), &captured_inputs_, ret_types_); + + FunctionLibraryRuntime::Options f_opts; + f_opts.step_id = CapturedFunction::generate_step_id(); + ResourceMgr* resource_mgr = ctx->lib()->device()->resource_manager(); + auto step_container = new ScopedStepContainer( + f_opts.step_id, [resource_mgr](const string& name) { + resource_mgr->Cleanup(name).IgnoreError(); + }); + f_opts.step_container = step_container; + f_opts.runner = ctx->runner(); // TODO(mrry): Add cancellation manager support to IteratorContext // so that we can cancel running map functions. The local // cancellation manager here is created so that we can run kernels @@ -270,24 +286,24 @@ void CapturedFunction::RunAsync( // `OpKernelContext::cancellation_manager()`, but additional effort // will be required to plumb it through the `IteratorContext`. auto c_mgr = new CancellationManager; - auto frame = - new OwnedArgsCallFrame(std::move(args), &captured_inputs_, ret_types_); f_opts.cancellation_manager = c_mgr; - mutex_lock l(mu_); - lib_->Run(f_opts, f_handle_, frame, - std::bind( - [rets, c_mgr, frame](FunctionLibraryRuntime::DoneCallback done, - // Begin unbound arguments. - Status s) { - delete c_mgr; - if (s.ok()) { - s = frame->ConsumeRetvals(rets); - } - delete frame; - done(s); - }, - std::move(done), std::placeholders::_1)); + tf_shared_lock l(mu_); + ctx->lib()->Run(f_opts, handle, frame, + std::bind( + [rets, step_container, c_mgr, frame]( + FunctionLibraryRuntime::DoneCallback done, + // Begin unbound arguments. + Status s) { + delete step_container; + delete c_mgr; + if (s.ok()) { + s = frame->ConsumeRetvals(rets); + } + delete frame; + done(s); + }, + std::move(done), std::placeholders::_1)); } CapturedFunction::CapturedFunction(const NameAttrList& func, diff --git a/tensorflow/core/kernels/data/captured_function.h b/tensorflow/core/kernels/data/captured_function.h index 6ad80d04ff..99e0ef426e 100644 --- a/tensorflow/core/kernels/data/captured_function.h +++ b/tensorflow/core/kernels/data/captured_function.h @@ -54,14 +54,13 @@ class CapturedFunction { // tensors in `args`, in order to be able to deallocate them as early as // possible. Use `RunWithBorrowedArgs()` if the caller needs to retain // ownership of the `args`. - Status Run(IteratorContext* ctx, FunctionLibraryRuntime::Options f_opts, - std::vector&& args, std::vector* rets); + Status Run(IteratorContext* ctx, std::vector&& args, + std::vector* rets); // Synchronously runs the captured function on the given `args`, and stores // the results in `*rets`. Prefer to use `Run()` or `RunAsync()` when // possible. Status RunWithBorrowedArgs(IteratorContext* ctx, - FunctionLibraryRuntime::Options f_opts, const std::vector& args, std::vector* rets); @@ -69,10 +68,8 @@ class CapturedFunction { // the results in `*rets`, and calls the given `done` callback when the // function returns. This method takes ownership of the tensors in `args`, // in order to be able to deallocate them as early as possible. - void RunAsync(FunctionLibraryRuntime* lib, - FunctionLibraryRuntime::InstantiateOptions inst_opts, - FunctionLibraryRuntime::Options f_opts, - std::vector&& args, std::vector* rets, + void RunAsync(IteratorContext* ctx, std::vector&& args, + std::vector* rets, FunctionLibraryRuntime::DoneCallback done); // Returns that additional captured inputs that will be passed to the function @@ -93,10 +90,8 @@ class CapturedFunction { CapturedFunction(const NameAttrList& func, std::vector captured_inputs); - Status set_lib(FunctionLibraryRuntime* lib); - - Status MaybeInstantiate(FunctionLibraryRuntime* lib, - FunctionLibraryRuntime::InstantiateOptions inst_opts); + Status MaybeInstantiate(IteratorContext* ctx, + FunctionLibraryRuntime::Handle* out_handle); mutex mu_; const NameAttrList func_; diff --git a/tensorflow/core/kernels/data/dataset_utils.cc b/tensorflow/core/kernels/data/dataset_utils.cc index 82786ceb98..e3a3601ee8 100644 --- a/tensorflow/core/kernels/data/dataset_utils.cc +++ b/tensorflow/core/kernels/data/dataset_utils.cc @@ -14,7 +14,6 @@ limitations under the License. ==============================================================================*/ #include "tensorflow/core/kernels/data/dataset_utils.h" -#include "tensorflow/core/common_runtime/device.h" namespace tensorflow { @@ -24,22 +23,10 @@ Status MakeIteratorFromInputElement( IteratorContext* ctx, const std::vector& input_element, int64 thread_index, CapturedFunction* captured_func, StringPiece prefix, std::unique_ptr* out_iterator) { - FunctionLibraryRuntime::Options opts; - opts.runner = ctx->runner(); - // Choose a step ID that is guaranteed not to clash with any - // Session-generated step ID. DirectSession only generates - // non-negative step IDs (contiguous, starting from 0), and - // MasterSession generates 56-bit random step IDs whose MSB - // is always 0, so a negative random step ID should suffice. - opts.step_id = CapturedFunction::generate_step_id(); - ScopedStepContainer step_container(opts.step_id, [ctx](const string& name) { - ctx->lib()->device()->resource_manager()->Cleanup(name).IgnoreError(); - }); - opts.step_container = &step_container; std::vector return_values; - TF_RETURN_IF_ERROR(captured_func->RunWithBorrowedArgs( - ctx, opts, input_element, &return_values)); + TF_RETURN_IF_ERROR( + captured_func->RunWithBorrowedArgs(ctx, input_element, &return_values)); if (!(return_values.size() == 1 && return_values[0].dtype() == DT_VARIANT && TensorShapeUtils::IsScalar(return_values[0].shape()))) { diff --git a/tensorflow/core/kernels/data/filter_dataset_op.cc b/tensorflow/core/kernels/data/filter_dataset_op.cc index 4e2d1c5474..d16b5b7d41 100644 --- a/tensorflow/core/kernels/data/filter_dataset_op.cc +++ b/tensorflow/core/kernels/data/filter_dataset_op.cc @@ -143,30 +143,14 @@ class FilterDatasetOp : public UnaryDatasetOpKernel { return Status::OK(); } - FunctionLibraryRuntime::Options opts; - opts.step_id = CapturedFunction::generate_step_id(); - ScopedStepContainer step_container(opts.step_id, - [ctx](const string& name) { - ctx->lib() - ->device() - ->resource_manager() - ->Cleanup(name) - .IgnoreError(); - }); - opts.step_container = &step_container; - opts.runner = ctx->runner(); // TODO(mrry): Avoid blocking a threadpool thread. We will need to // stack-rip the iterators and use async kernels. - Notification n; - Status ret; std::vector result; - ret = dataset()->captured_func_->RunWithBorrowedArgs( - ctx, opts, *out_tensors, &result); + TF_RETURN_IF_ERROR(dataset()->captured_func_->RunWithBorrowedArgs( + ctx, *out_tensors, &result)); - if (!ret.ok()) { - return ret; - } else if (result.size() != 1 || result[0].dtype() != DT_BOOL || - result[0].NumElements() != 1) { + if (result.size() != 1 || result[0].dtype() != DT_BOOL || + result[0].NumElements() != 1) { return errors::InvalidArgument( "Filter predicate `f` must return a scalar bool."); } diff --git a/tensorflow/core/kernels/data/group_by_window_dataset_op.cc b/tensorflow/core/kernels/data/group_by_window_dataset_op.cc index b5e755694d..eb047e10ec 100644 --- a/tensorflow/core/kernels/data/group_by_window_dataset_op.cc +++ b/tensorflow/core/kernels/data/group_by_window_dataset_op.cc @@ -232,25 +232,12 @@ class GroupByWindowDatasetOp : public UnaryDatasetOpKernel { input_impl_->GetNext(ctx, &next_input_element, &end_of_input_)); if (!end_of_input_) { - FunctionLibraryRuntime::Options opts; - opts.step_id = CapturedFunction::generate_step_id(); - opts.runner = ctx->runner(); - ScopedStepContainer step_container(opts.step_id, - [ctx](const string& name) { - ctx->lib() - ->device() - ->resource_manager() - ->Cleanup(name) - .IgnoreError(); - }); - opts.step_container = &step_container; - // Run the key function on the input element to identify its // group. std::vector key_func_output; TF_RETURN_IF_ERROR( dataset()->captured_key_func_->RunWithBorrowedArgs( - ctx, opts, next_input_element, &key_func_output)); + ctx, next_input_element, &key_func_output)); if (key_func_output.size() != 1 || key_func_output[0].dtype() != DT_INT64 || @@ -262,26 +249,11 @@ class GroupByWindowDatasetOp : public UnaryDatasetOpKernel { const int64 key = key_func_output[0].scalar()(); if (window_sizes_.find(key) == window_sizes_.end()) { - // Run window_size function - FunctionLibraryRuntime::Options opts2; - opts2.step_id = CapturedFunction::generate_step_id(); - opts2.runner = ctx->runner(); - ScopedStepContainer step_container2(opts2.step_id, - [ctx](const string& name) { - ctx->lib() - ->device() - ->resource_manager() - ->Cleanup(name) - .IgnoreError(); - }); - opts2.step_container = &step_container2; - // Run the window size function on the key to identify its // window size. std::vector window_size_func_output; TF_RETURN_IF_ERROR(dataset()->captured_window_size_func_->Run( - ctx, opts2, std::move(key_func_output), - &window_size_func_output)); + ctx, std::move(key_func_output), &window_size_func_output)); if (window_size_func_output.size() != 1 || window_size_func_output[0].dtype() != DT_INT64 || @@ -475,15 +447,6 @@ class GroupByWindowDatasetOp : public UnaryDatasetOpKernel { Status StartFlushingGroup(IteratorContext* ctx, int64 key) EXCLUSIVE_LOCKS_REQUIRED(mu_) { - FunctionLibraryRuntime::Options opts; - opts.step_id = CapturedFunction::generate_step_id(); - opts.runner = ctx->runner(); - ScopedStepContainer step_container(opts.step_id, [ctx](const string& - name) { - ctx->lib()->device()->resource_manager()->Cleanup(name).IgnoreError(); - }); - opts.step_container = &step_container; - DatasetBase* group_dataset; TF_RETURN_IF_ERROR(NewWindowDataset( groups_[key], dataset()->input_->output_dtypes(), @@ -500,7 +463,7 @@ class GroupByWindowDatasetOp : public UnaryDatasetOpKernel { {std::move(key_arg), std::move(group_dataset_arg)}); std::vector return_values; TF_RETURN_IF_ERROR(dataset()->captured_reduce_func_->Run( - ctx, opts, std::move(args), &return_values)); + ctx, std::move(args), &return_values)); if (!(return_values.size() == 1 && return_values[0].dtype() == DT_VARIANT && diff --git a/tensorflow/core/kernels/data/map_and_batch_dataset_op.cc b/tensorflow/core/kernels/data/map_and_batch_dataset_op.cc index 4c4156ced0..c529f671f2 100644 --- a/tensorflow/core/kernels/data/map_and_batch_dataset_op.cc +++ b/tensorflow/core/kernels/data/map_and_batch_dataset_op.cc @@ -279,31 +279,13 @@ class MapAndBatchDatasetOp : public UnaryDatasetOpKernel { // Call `captured_func_(input_element)`, store the result in // `result->return_values`, and notify `batch_result->counter` // to unblock a consumer. - FunctionLibraryRuntime::Options opts; - opts.step_id = CapturedFunction::generate_step_id(); - ResourceMgr* resource_mgr = ctx->lib()->device()->resource_manager(); - ScopedStepContainer* step_container = new ScopedStepContainer( - opts.step_id, [resource_mgr](const string& name) { - resource_mgr->Cleanup(name).IgnoreError(); - }); - opts.step_container = step_container; - std::function)>* runner = - new std::function)>(*ctx->runner()); - opts.runner = runner; - FunctionLibraryRuntime* lib = ctx->lib(); - FunctionLibraryRuntime::InstantiateOptions inst_opts; - inst_opts.overlay_lib = ctx->function_library().get(); - (*ctx->runner())(std::bind( - [this, lib, inst_opts, opts, result, step_container, runner, - batch_result, offset](std::vector input_element) { + [this, result, batch_result, offset]( + IteratorContext* ctx, std::vector input_element) { dataset()->captured_func_->RunAsync( - lib, inst_opts, opts, std::move(input_element), - &result->return_values, - [this, step_container, runner, result, batch_result, - offset](Status ret_status) { - delete step_container; - delete runner; + ctx, std::move(input_element), &result->return_values, + [this, ctx, result, batch_result, offset](Status ret_status) { + delete ctx; result->status.Update(ret_status); if (ret_status.ok()) { EnsureOutputAllocated(batch_result, @@ -345,7 +327,7 @@ class MapAndBatchDatasetOp : public UnaryDatasetOpKernel { batch_result->counter->DecrementCount(); }); }, - std::move(input_element))); + new IteratorContext(*ctx), std::move(input_element))); } void StartInvocationBatch(IteratorContext* ctx, int64 batch_index) diff --git a/tensorflow/core/kernels/data/map_dataset_op.cc b/tensorflow/core/kernels/data/map_dataset_op.cc index e98eebaea1..01f9b9fa09 100644 --- a/tensorflow/core/kernels/data/map_dataset_op.cc +++ b/tensorflow/core/kernels/data/map_dataset_op.cc @@ -140,19 +140,10 @@ class MapDatasetOp : public UnaryDatasetOpKernel { return Status::OK(); } - FunctionLibraryRuntime::Options opts; - opts.step_id = CapturedFunction::generate_step_id(); - - ScopedStepContainer step_container(opts.step_id, [ctx](const string& - name) { - ctx->lib()->device()->resource_manager()->Cleanup(name).IgnoreError(); - }); - opts.step_container = &step_container; - opts.runner = ctx->runner(); // TODO(mrry): Avoid blocking a threadpool thread. We will need to // stack-rip the iterators and use async kernels. - Status s = dataset()->captured_func_->Run(ctx, opts, std::move(args), - out_tensors); + Status s = + dataset()->captured_func_->Run(ctx, std::move(args), out_tensors); if (errors::IsOutOfRange(s)) { // `f` may deliberately raise `errors::OutOfRange` to indicate // that we should terminate the iteration early. diff --git a/tensorflow/core/kernels/data/parallel_map_dataset_op.cc b/tensorflow/core/kernels/data/parallel_map_dataset_op.cc index dd4fde3286..f09871d98d 100644 --- a/tensorflow/core/kernels/data/parallel_map_dataset_op.cc +++ b/tensorflow/core/kernels/data/parallel_map_dataset_op.cc @@ -326,25 +326,9 @@ class ParallelMapDatasetOp : public UnaryDatasetOpKernel { // `result->return_values`, and notify `result->notification` // to unblock a consumer. result->notification.reset(new Notification); - - FunctionLibraryRuntime::Options opts; - opts.step_id = CapturedFunction::generate_step_id(); - ResourceMgr* resource_manager = - ctx->lib()->device()->resource_manager(); - ScopedStepContainer* step_container = new ScopedStepContainer( - opts.step_id, [resource_manager](const string& name) { - resource_manager->Cleanup(name).IgnoreError(); - }); - opts.step_container = step_container; - opts.runner = ctx->runner(); - FunctionLibraryRuntime::InstantiateOptions inst_opts; - inst_opts.overlay_lib = ctx->function_library().get(); - dataset()->captured_func_->RunAsync( - ctx->lib(), inst_opts, opts, std::move(input_element), - &result->return_values, - [result, step_container, result_index](Status ret_status) { - delete step_container; + ctx, std::move(input_element), &result->return_values, + [result, result_index](Status ret_status) { result->status.Update(ret_status); result->notification->Notify(); }); diff --git a/tensorflow/core/kernels/data/scan_dataset_op.cc b/tensorflow/core/kernels/data/scan_dataset_op.cc index 05cd63d361..5dd6ff848e 100644 --- a/tensorflow/core/kernels/data/scan_dataset_op.cc +++ b/tensorflow/core/kernels/data/scan_dataset_op.cc @@ -170,19 +170,11 @@ class ScanDatasetOp : public UnaryDatasetOpKernel { std::copy(next_element.begin(), next_element.end(), std::back_inserter(args)); - FunctionLibraryRuntime::Options opts; - opts.step_id = CapturedFunction::generate_step_id(); - ScopedStepContainer step_container(opts.step_id, [ctx](const string& - name) { - ctx->lib()->device()->resource_manager()->Cleanup(name).IgnoreError(); - }); - opts.step_container = &step_container; - opts.runner = ctx->runner(); std::vector state_and_output; state_and_output.reserve(dataset()->state_types_.size() + output_dtypes().size()); - Status s = dataset()->captured_func_->Run(ctx, opts, std::move(args), + Status s = dataset()->captured_func_->Run(ctx, std::move(args), &state_and_output); if (s.ok()) { state_.clear(); -- cgit v1.2.3