diff options
Diffstat (limited to 'tensorflow/compiler/xla/service/cpu/parallel_cpu_executable.cc')
-rw-r--r-- | tensorflow/compiler/xla/service/cpu/parallel_cpu_executable.cc | 146 |
1 files changed, 32 insertions, 114 deletions
diff --git a/tensorflow/compiler/xla/service/cpu/parallel_cpu_executable.cc b/tensorflow/compiler/xla/service/cpu/parallel_cpu_executable.cc index 0077e344e2..d1b88b27f0 100644 --- a/tensorflow/compiler/xla/service/cpu/parallel_cpu_executable.cc +++ b/tensorflow/compiler/xla/service/cpu/parallel_cpu_executable.cc @@ -376,19 +376,6 @@ Status ParallelCpuExecutable::ExecuteComputeFunctions( tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments, tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers, HloExecutionProfile* hlo_execution_profile) { - std::vector<se::DeviceMemoryBase> argument_buffers(arguments.size()); - for (int i = 0; i < arguments.size(); ++i) { - argument_buffers[i] = arguments[i]->buffer(/*index=*/{}); - } - return ExecuteComputeFunctions(run_options, argument_buffers, buffers, - hlo_execution_profile); -} - -Status ParallelCpuExecutable::ExecuteComputeFunctions( - const ServiceExecutableRunOptions* run_options, - tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> arguments, - tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers, - HloExecutionProfile* hlo_execution_profile) { // Allocate profiling counters for each hlo instruction that we would like to // profile. std::vector<int64>* profile_counters = nullptr; @@ -428,8 +415,9 @@ Status ParallelCpuExecutable::ExecuteComputeFunctions( // just copy the existing buffer into the map containing instruction // results.. if (instruction->opcode() == HloOpcode::kParameter) { - InsertOrDie(&results, instruction, - arguments[instruction->parameter_number()].opaque()); + InsertOrDie( + &results, instruction, + arguments[instruction->parameter_number()]->root_buffer().opaque()); } else if (instruction->opcode() == HloOpcode::kConstant) { unsigned char* aligned_data = FindOrDie(aligned_constants_, instruction).get(); @@ -461,69 +449,6 @@ Status ParallelCpuExecutable::ExecuteComputeFunctions( return Status::OK(); } -StatusOr<perftools::gputools::DeviceMemoryBase> -ParallelCpuExecutable::ExecuteOnStream( - const ServiceExecutableRunOptions* run_options, - tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> arguments, - HloExecutionProfile* hlo_execution_profile) { - se::Stream* stream = run_options->stream(); - DeviceMemoryAllocator* memory_allocator = run_options->allocator(); - VLOG(3) << "ExecuteOnStream arg size: " << arguments.size(); - if (!arguments.empty()) { - VLOG(3) << "ExecuteOnStream arg[0]: " << arguments.at(0).opaque(); - } - - // Allocate the temporary buffers required for the computation. - se::StreamExecutor* stream_executor = stream->parent(); - int device_ordinal = stream_executor->device_ordinal(); - int64 buffer_count = assignment_->Allocations().size(); - VLOG(3) << "temp buffer count: " << buffer_count; - - std::vector<se::DeviceMemoryBase> device_allocations( - assignment_->Allocations().size()); - TF_RETURN_IF_ERROR(AllocateBuffers(memory_allocator, - stream->parent()->device_ordinal(), - &device_allocations)); - - TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice, - assignment_->GetUniqueTopLevelOutputSlice()); - const BufferAllocation::Index result_index = result_slice.index(); - VLOG(3) << "result index: " << result_index; - - TF_RETURN_IF_ERROR(ExecuteComputeFunctions( - run_options, arguments, device_allocations, hlo_execution_profile)); - - // Mark the buffers that are actually live (used in the output) when the - // computation finishes executing. - std::unordered_set<const void*> marked_addresses; - MarkLiveAddressesInOutput(device_allocations[result_index].opaque(), - result_shape(), &marked_addresses); - - VLOG(3) << "Live addresses in output marking found " - << marked_addresses.size() << " addresses:\n" - << tensorflow::str_util::Join( - marked_addresses, ", ", [](string* out, const void* address) { - tensorflow::strings::StrAppend( - out, tensorflow::strings::Printf("%p", address)); - }); - - // Computation is done - deallocate temp buffers. Keep those marked - // live because they are referenced by the output of the computation - // and are needed by the service. They will be deallocated by the - // service. - for (size_t i = 0; i < device_allocations.size(); ++i) { - auto alloc = device_allocations[i]; - if (marked_addresses.count(alloc.opaque()) == 0 && - alloc.opaque() != nullptr) { - VLOG(3) << "ParallelCpuExecutable deallocating buffer #" << i << " [" - << alloc.opaque() << "]"; - TF_RETURN_IF_ERROR(memory_allocator->Deallocate(device_ordinal, &alloc)); - } - } - - return device_allocations[result_index]; -} - StatusOr<std::unique_ptr<ShapedBuffer>> ParallelCpuExecutable::ExecuteOnStream( const ServiceExecutableRunOptions* run_options, tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments, @@ -536,9 +461,9 @@ StatusOr<std::unique_ptr<ShapedBuffer>> ParallelCpuExecutable::ExecuteOnStream( DeviceMemoryAllocator* memory_allocator = run_options->allocator(); std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size()); - auto result_buffer = - MakeUnique<ShapedBuffer>(result_shape(), stream->parent()->platform(), - stream->parent()->device_ordinal()); + auto result_buffer = MakeUnique<ShapedBuffer>( + /*on_host_shape=*/result_shape(), /*on_device_shape=*/result_shape(), + stream->parent()->platform(), stream->parent()->device_ordinal()); TF_RETURN_IF_ERROR(AllocateBuffers( memory_allocator, stream->parent()->device_ordinal(), &buffers)); @@ -549,37 +474,30 @@ StatusOr<std::unique_ptr<ShapedBuffer>> ParallelCpuExecutable::ExecuteOnStream( // Copy DeviceMemoryBase values which into the respective location in // ShapedBuffer which is returned to the caller. std::vector<bool> buffers_in_result(assignment_->Allocations().size(), false); - TF_RETURN_IF_ERROR( - result_buffer->mutable_shape_index_to_buffer_entry() - ->ForEachMutableElementWithStatus( - [&buffers, &buffers_in_result, &result_buffer, this]( - const ShapeIndex& index, size_t* buffer_entry) { - const auto& sources = - this->GetRootPointsToSet().element(index); - // The points to set is unambiguous so the set should be a - // singleton. - CHECK_EQ(1, sources.size()); - const LogicalBuffer* buffer_source = sources[0]; - HloInstruction* src = buffer_source->instruction(); - - // The source for this result buffer can be a nested buffer - // such as a tuple element. - - // The source instruction should have a non-parameter buffer - // assigned. - TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice slice, - this->assignment_->GetUniqueSlice( - src, buffer_source->index())); - CHECK(!slice.allocation()->is_entry_computation_parameter()); - - const BufferAllocation::Index buffer_index = slice.index(); - const se::DeviceMemoryBase& buffer = buffers[buffer_index]; - CHECK(!buffer.is_null() || buffer.size() == 0); - *buffer_entry = result_buffer->mutable_buffers()->size(); - result_buffer->mutable_buffers()->push_back(buffer); - buffers_in_result[buffer_index] = true; - return Status::OK(); - })); + TF_RETURN_IF_ERROR(result_buffer->buffers().ForEachMutableElementWithStatus( + [&](const ShapeIndex& index, se::DeviceMemoryBase* device_memory) { + const auto& sources = this->GetRootPointsToSet().element(index); + + // The points to set is unambiguous so the set should be a singleton. + CHECK_EQ(1, sources.size()); + const LogicalBuffer* buffer_source = sources[0]; + HloInstruction* src = buffer_source->instruction(); + + // The source for this result buffer can be a nested buffer such as a + // tuple element. The source instruction should have a non-parameter + // buffer assigned. + TF_ASSIGN_OR_RETURN( + const BufferAllocation::Slice slice, + this->assignment_->GetUniqueSlice(src, buffer_source->index())); + CHECK(!slice.allocation()->is_entry_computation_parameter()); + + const BufferAllocation::Index buffer_index = slice.index(); + const se::DeviceMemoryBase& buffer = buffers[buffer_index]; + CHECK(!buffer.is_null() || buffer.size() == 0); + *device_memory = buffer; + buffers_in_result[buffer_index] = true; + return Status::OK(); + })); // Free all buffers not in the result. for (size_t i = 0; i < buffers.size(); ++i) { @@ -595,10 +513,10 @@ StatusOr<std::unique_ptr<ShapedBuffer>> ParallelCpuExecutable::ExecuteOnStream( return std::move(result_buffer); } -StatusOr<perftools::gputools::DeviceMemoryBase> +StatusOr<std::unique_ptr<ShapedBuffer>> ParallelCpuExecutable::ExecuteAsyncOnStream( const ServiceExecutableRunOptions* run_options, - tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> arguments) { + tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments) { // TODO(b/30671675): Implement asynchronous execution mode. return Unimplemented( "Asynchronous execution on stream is not yet supported on CPU."); |