aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/compiler/xla/service/cpu/parallel_cpu_executable.cc
diff options
context:
space:
mode:
Diffstat (limited to 'tensorflow/compiler/xla/service/cpu/parallel_cpu_executable.cc')
-rw-r--r--tensorflow/compiler/xla/service/cpu/parallel_cpu_executable.cc146
1 files changed, 32 insertions, 114 deletions
diff --git a/tensorflow/compiler/xla/service/cpu/parallel_cpu_executable.cc b/tensorflow/compiler/xla/service/cpu/parallel_cpu_executable.cc
index 0077e344e2..d1b88b27f0 100644
--- a/tensorflow/compiler/xla/service/cpu/parallel_cpu_executable.cc
+++ b/tensorflow/compiler/xla/service/cpu/parallel_cpu_executable.cc
@@ -376,19 +376,6 @@ Status ParallelCpuExecutable::ExecuteComputeFunctions(
tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments,
tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers,
HloExecutionProfile* hlo_execution_profile) {
- std::vector<se::DeviceMemoryBase> argument_buffers(arguments.size());
- for (int i = 0; i < arguments.size(); ++i) {
- argument_buffers[i] = arguments[i]->buffer(/*index=*/{});
- }
- return ExecuteComputeFunctions(run_options, argument_buffers, buffers,
- hlo_execution_profile);
-}
-
-Status ParallelCpuExecutable::ExecuteComputeFunctions(
- const ServiceExecutableRunOptions* run_options,
- tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> arguments,
- tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers,
- HloExecutionProfile* hlo_execution_profile) {
// Allocate profiling counters for each hlo instruction that we would like to
// profile.
std::vector<int64>* profile_counters = nullptr;
@@ -428,8 +415,9 @@ Status ParallelCpuExecutable::ExecuteComputeFunctions(
// just copy the existing buffer into the map containing instruction
// results..
if (instruction->opcode() == HloOpcode::kParameter) {
- InsertOrDie(&results, instruction,
- arguments[instruction->parameter_number()].opaque());
+ InsertOrDie(
+ &results, instruction,
+ arguments[instruction->parameter_number()]->root_buffer().opaque());
} else if (instruction->opcode() == HloOpcode::kConstant) {
unsigned char* aligned_data =
FindOrDie(aligned_constants_, instruction).get();
@@ -461,69 +449,6 @@ Status ParallelCpuExecutable::ExecuteComputeFunctions(
return Status::OK();
}
-StatusOr<perftools::gputools::DeviceMemoryBase>
-ParallelCpuExecutable::ExecuteOnStream(
- const ServiceExecutableRunOptions* run_options,
- tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> arguments,
- HloExecutionProfile* hlo_execution_profile) {
- se::Stream* stream = run_options->stream();
- DeviceMemoryAllocator* memory_allocator = run_options->allocator();
- VLOG(3) << "ExecuteOnStream arg size: " << arguments.size();
- if (!arguments.empty()) {
- VLOG(3) << "ExecuteOnStream arg[0]: " << arguments.at(0).opaque();
- }
-
- // Allocate the temporary buffers required for the computation.
- se::StreamExecutor* stream_executor = stream->parent();
- int device_ordinal = stream_executor->device_ordinal();
- int64 buffer_count = assignment_->Allocations().size();
- VLOG(3) << "temp buffer count: " << buffer_count;
-
- std::vector<se::DeviceMemoryBase> device_allocations(
- assignment_->Allocations().size());
- TF_RETURN_IF_ERROR(AllocateBuffers(memory_allocator,
- stream->parent()->device_ordinal(),
- &device_allocations));
-
- TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice,
- assignment_->GetUniqueTopLevelOutputSlice());
- const BufferAllocation::Index result_index = result_slice.index();
- VLOG(3) << "result index: " << result_index;
-
- TF_RETURN_IF_ERROR(ExecuteComputeFunctions(
- run_options, arguments, device_allocations, hlo_execution_profile));
-
- // Mark the buffers that are actually live (used in the output) when the
- // computation finishes executing.
- std::unordered_set<const void*> marked_addresses;
- MarkLiveAddressesInOutput(device_allocations[result_index].opaque(),
- result_shape(), &marked_addresses);
-
- VLOG(3) << "Live addresses in output marking found "
- << marked_addresses.size() << " addresses:\n"
- << tensorflow::str_util::Join(
- marked_addresses, ", ", [](string* out, const void* address) {
- tensorflow::strings::StrAppend(
- out, tensorflow::strings::Printf("%p", address));
- });
-
- // Computation is done - deallocate temp buffers. Keep those marked
- // live because they are referenced by the output of the computation
- // and are needed by the service. They will be deallocated by the
- // service.
- for (size_t i = 0; i < device_allocations.size(); ++i) {
- auto alloc = device_allocations[i];
- if (marked_addresses.count(alloc.opaque()) == 0 &&
- alloc.opaque() != nullptr) {
- VLOG(3) << "ParallelCpuExecutable deallocating buffer #" << i << " ["
- << alloc.opaque() << "]";
- TF_RETURN_IF_ERROR(memory_allocator->Deallocate(device_ordinal, &alloc));
- }
- }
-
- return device_allocations[result_index];
-}
-
StatusOr<std::unique_ptr<ShapedBuffer>> ParallelCpuExecutable::ExecuteOnStream(
const ServiceExecutableRunOptions* run_options,
tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments,
@@ -536,9 +461,9 @@ StatusOr<std::unique_ptr<ShapedBuffer>> ParallelCpuExecutable::ExecuteOnStream(
DeviceMemoryAllocator* memory_allocator = run_options->allocator();
std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size());
- auto result_buffer =
- MakeUnique<ShapedBuffer>(result_shape(), stream->parent()->platform(),
- stream->parent()->device_ordinal());
+ auto result_buffer = MakeUnique<ShapedBuffer>(
+ /*on_host_shape=*/result_shape(), /*on_device_shape=*/result_shape(),
+ stream->parent()->platform(), stream->parent()->device_ordinal());
TF_RETURN_IF_ERROR(AllocateBuffers(
memory_allocator, stream->parent()->device_ordinal(), &buffers));
@@ -549,37 +474,30 @@ StatusOr<std::unique_ptr<ShapedBuffer>> ParallelCpuExecutable::ExecuteOnStream(
// Copy DeviceMemoryBase values which into the respective location in
// ShapedBuffer which is returned to the caller.
std::vector<bool> buffers_in_result(assignment_->Allocations().size(), false);
- TF_RETURN_IF_ERROR(
- result_buffer->mutable_shape_index_to_buffer_entry()
- ->ForEachMutableElementWithStatus(
- [&buffers, &buffers_in_result, &result_buffer, this](
- const ShapeIndex& index, size_t* buffer_entry) {
- const auto& sources =
- this->GetRootPointsToSet().element(index);
- // The points to set is unambiguous so the set should be a
- // singleton.
- CHECK_EQ(1, sources.size());
- const LogicalBuffer* buffer_source = sources[0];
- HloInstruction* src = buffer_source->instruction();
-
- // The source for this result buffer can be a nested buffer
- // such as a tuple element.
-
- // The source instruction should have a non-parameter buffer
- // assigned.
- TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice slice,
- this->assignment_->GetUniqueSlice(
- src, buffer_source->index()));
- CHECK(!slice.allocation()->is_entry_computation_parameter());
-
- const BufferAllocation::Index buffer_index = slice.index();
- const se::DeviceMemoryBase& buffer = buffers[buffer_index];
- CHECK(!buffer.is_null() || buffer.size() == 0);
- *buffer_entry = result_buffer->mutable_buffers()->size();
- result_buffer->mutable_buffers()->push_back(buffer);
- buffers_in_result[buffer_index] = true;
- return Status::OK();
- }));
+ TF_RETURN_IF_ERROR(result_buffer->buffers().ForEachMutableElementWithStatus(
+ [&](const ShapeIndex& index, se::DeviceMemoryBase* device_memory) {
+ const auto& sources = this->GetRootPointsToSet().element(index);
+
+ // The points to set is unambiguous so the set should be a singleton.
+ CHECK_EQ(1, sources.size());
+ const LogicalBuffer* buffer_source = sources[0];
+ HloInstruction* src = buffer_source->instruction();
+
+ // The source for this result buffer can be a nested buffer such as a
+ // tuple element. The source instruction should have a non-parameter
+ // buffer assigned.
+ TF_ASSIGN_OR_RETURN(
+ const BufferAllocation::Slice slice,
+ this->assignment_->GetUniqueSlice(src, buffer_source->index()));
+ CHECK(!slice.allocation()->is_entry_computation_parameter());
+
+ const BufferAllocation::Index buffer_index = slice.index();
+ const se::DeviceMemoryBase& buffer = buffers[buffer_index];
+ CHECK(!buffer.is_null() || buffer.size() == 0);
+ *device_memory = buffer;
+ buffers_in_result[buffer_index] = true;
+ return Status::OK();
+ }));
// Free all buffers not in the result.
for (size_t i = 0; i < buffers.size(); ++i) {
@@ -595,10 +513,10 @@ StatusOr<std::unique_ptr<ShapedBuffer>> ParallelCpuExecutable::ExecuteOnStream(
return std::move(result_buffer);
}
-StatusOr<perftools::gputools::DeviceMemoryBase>
+StatusOr<std::unique_ptr<ShapedBuffer>>
ParallelCpuExecutable::ExecuteAsyncOnStream(
const ServiceExecutableRunOptions* run_options,
- tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> arguments) {
+ tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments) {
// TODO(b/30671675): Implement asynchronous execution mode.
return Unimplemented(
"Asynchronous execution on stream is not yet supported on CPU.");