aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar A. Unique TensorFlower <gardener@tensorflow.org>2018-07-24 13:50:22 -0700
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2018-07-24 13:55:16 -0700
commitee0bd6ef450b388fadea63b31b65b13bd12f17d6 (patch)
treea7575f0449f0235a8cf9a05a3207f6f60990e943
parent6b2ae9b3da572d2f8e1eccdf2922efce43cbecd4 (diff)
Automated rollback of commit 0ea6847c892497afdd20c1150fee1e532612ca17
PiperOrigin-RevId: 205885304
-rw-r--r--tensorflow/compiler/jit/xla_compilation_cache.cc18
-rw-r--r--tensorflow/compiler/jit/xla_device_context.cc117
-rw-r--r--tensorflow/compiler/jit/xla_device_context.h5
-rw-r--r--tensorflow/compiler/jit/xla_tensor.cc4
-rw-r--r--tensorflow/compiler/xla/service/executable.cc13
-rw-r--r--tensorflow/compiler/xla/service/hlo_runner.cc9
-rw-r--r--tensorflow/compiler/xla/tests/local_client_execute_test.cc4
-rw-r--r--tensorflow/compiler/xla/tests/local_client_test_base.cc14
-rw-r--r--tensorflow/compiler/xla/tests/xla_hlo_profile_test.cc1
-rw-r--r--tensorflow/stream_executor/host/host_gpu_executor.cc2
-rw-r--r--tensorflow/stream_executor/stream.cc6
11 files changed, 143 insertions, 50 deletions
diff --git a/tensorflow/compiler/jit/xla_compilation_cache.cc b/tensorflow/compiler/jit/xla_compilation_cache.cc
index 7ed609c437..54a41a4daa 100644
--- a/tensorflow/compiler/jit/xla_compilation_cache.cc
+++ b/tensorflow/compiler/jit/xla_compilation_cache.cc
@@ -40,7 +40,23 @@ namespace tensorflow {
XlaCompilationCache::XlaCompilationCache(xla::LocalClient* client,
DeviceType device_type)
: client_(client), device_type_(std::move(device_type)) {}
-XlaCompilationCache::~XlaCompilationCache() = default;
+XlaCompilationCache::~XlaCompilationCache() {
+ // Ensure any use of our programs have completed by waiting for all stream
+ // executors to complete.
+ for (auto* executor : client_->backend().stream_executors()) {
+ bool ok = executor->SynchronizeAllActivity();
+ if (!ok) {
+ LOG(ERROR) << "Error synchronizing activity while waiting for all "
+ "programs to complete";
+ }
+ }
+ // TODO(b/110813685): Think about the program ownership model. Programs are
+ // currently owned by the compilation cache which means we must wait for
+ // program completion in the destructor. There are multiple compilation caches
+ // around, which complicates things a little. Perhaps having programs be
+ // shared_ptrs (an invasive change) would make the model easier to reason
+ // about?
+}
string XlaCompilationCache::DebugString() {
return "XLA JIT compilation cache";
diff --git a/tensorflow/compiler/jit/xla_device_context.cc b/tensorflow/compiler/jit/xla_device_context.cc
index 04778c0090..8cf198239c 100644
--- a/tensorflow/compiler/jit/xla_device_context.cc
+++ b/tensorflow/compiler/jit/xla_device_context.cc
@@ -74,43 +74,64 @@ Status XlaTransferManager::TransferLiteralToDevice(
xla::Shape xla_shape;
TF_RETURN_IF_ERROR(TensorShapeToXLAShape(host_tensor.dtype(),
host_tensor.shape(), &xla_shape));
- xla::BorrowingLiteral literal(
+ // Create a reference to hold onto host_tensor until after the literal has
+ // been transferred. Also make sure the literal exists until the function
+ // asynchronously completes, as it will be wrapped in an xla::LiteralSlice.
+ TensorReference ref(host_tensor);
+ auto literal = std::make_shared<xla::BorrowingLiteral>(
static_cast<const char*>(DMAHelper::base(&host_tensor)), xla_shape);
XlaTensor* xla_tensor = XlaTensor::FromTensor(device_tensor);
const xla::ShapedBuffer& shaped_buffer = xla_tensor->shaped_buffer();
- VLOG(1) << "Transfer to device as literal: " << literal.ToString() << " "
+ VLOG(1) << "Transfer to device as literal: " << literal->ToString() << " "
<< shaped_buffer.ToString();
- TF_RETURN_IF_ERROR(transfer_manager_->TransferLiteralToDevice(
- host_to_device_stream_, literal, shaped_buffer));
+ if (UseMultipleStreams()) {
+ // Initially wait for the compute stream so that memory allocations are
+ // synchronized.
+ host_to_device_stream_->ThenWaitFor(stream_);
+ }
+ TF_RETURN_IF_ERROR(transfer_manager_->TransferLiteralToDeviceAsync(
+ host_to_device_stream_, *literal, shaped_buffer));
if (UseMultipleStreams()) {
se::Event event(stream_->parent());
TF_RET_CHECK(event.Init()) << "Event failed to initialize!";
host_to_device_stream_->ThenRecordEvent(&event);
xla_tensor->SetDefinedOn(host_to_device_stream_, std::move(event));
}
+ // Unref the host tensor, and capture the literal shared_ptr too so it goes
+ // out of scope when the lambda completes.
+ host_to_device_stream_->ThenDoHostCallback([ref, literal]() { ref.Unref(); });
return Status::OK();
}
-Status XlaTransferManager::TransferLiteralFromDevice(
- Tensor* host_tensor, const Tensor& device_tensor) const {
+void XlaTransferManager::TransferLiteralFromDevice(
+ Tensor* host_tensor, const Tensor& device_tensor,
+ const StatusCallback& done) const {
const xla::ShapedBuffer& shaped_buffer =
XlaTensor::FromTensor(&device_tensor)->shaped_buffer();
- TF_ASSIGN_OR_RETURN(std::unique_ptr<xla::Literal> literal,
- transfer_manager_->TransferLiteralFromDevice(
- device_to_host_stream_, shaped_buffer));
- VLOG(1) << "Transfer from device as literal: " << literal->ToString() << " "
- << shaped_buffer.ToString();
- Tensor tensor;
- TF_RETURN_IF_ERROR(
- LiteralToHostTensor(*literal, host_tensor->dtype(), &tensor));
- // Reshape the tensor back to its declared shape.
- if (!host_tensor->CopyFrom(tensor, device_tensor.shape())) {
- return errors::Internal(
- "Tensor::CopyFrom failed when copying from XLA device to CPU");
- }
- return Status::OK();
+ TensorReference ref(device_tensor);
+ transfer_manager_->TransferLiteralFromDevice(
+ device_to_host_stream_, shaped_buffer,
+ [=, &shaped_buffer](
+ xla::StatusOr<std::unique_ptr<xla::Literal> > literal_or) {
+ ref.Unref();
+ done([&]() -> Status {
+ TF_ASSIGN_OR_RETURN(auto literal, std::move(literal_or));
+ VLOG(1) << "Transfer from device as literal: " << literal->ToString()
+ << " " << shaped_buffer.ToString();
+ Tensor tensor;
+ TF_RETURN_IF_ERROR(
+ LiteralToHostTensor(*literal, host_tensor->dtype(), &tensor));
+ // Reshape the tensor back to its declared shape.
+ Status status;
+ if (!host_tensor->CopyFrom(tensor, device_tensor.shape())) {
+ status = errors::Internal(
+ "Tensor::CopyFrom failed when copying from XLA device to CPU");
+ }
+ return status;
+ }());
+ });
}
void XlaTransferManager::CopyCPUTensorToDevice(const Tensor* cpu_tensor,
@@ -163,6 +184,12 @@ void XlaTransferManager::CopyCPUTensorToDevice(const Tensor* cpu_tensor,
return;
}
status = TransferLiteralToDevice(reshaped_cpu_tensor, device_tensor);
+ if (status.ok()) {
+ xla_tensor->set_host_tensor(*cpu_tensor);
+ host_to_device_stream_->ThenDoHostCallback(
+ [done]() { done(Status::OK()); });
+ return;
+ }
} else {
se::DeviceMemoryBase dev_dst_ptr =
XlaTensor::DeviceMemoryFromTensor(*device_tensor);
@@ -212,7 +239,8 @@ void XlaTransferManager::CopyDeviceTensorToCPU(const Tensor* device_tensor,
Status status;
if (transfer_as_literal_) {
- status = TransferLiteralFromDevice(cpu_tensor, *device_tensor);
+ TransferLiteralFromDevice(cpu_tensor, *device_tensor, done);
+ return;
} else {
device_to_host_stream_->ThenMemcpy(dst_ptr, dev_src_ptr, total_bytes);
// TODO(hpucha): Make this asynchronous.
@@ -234,15 +262,15 @@ void XlaTransferManager::CopyDeviceTensorToDevice(const Tensor& src_tensor,
<< reinterpret_cast<const void*>(src_tensor.tensor_data().data())
<< " "
<< reinterpret_cast<const void*>(dst_tensor->tensor_data().data());
- // TODO(phawkins): replace this code with an asynchronous implementation.
- auto body = [&]() {
+ // Perform memory allocation now, and enqueue the device-to-device transfer.
+ Status status = [&]() -> Status {
if (src_tensor.NumElements() == 0) {
return Status::OK();
}
// TODO(jmolloy): We co-opt the device_to_host stream for device to device
// transfers; perhaps we should have a dedicated device to device stream? or
// one per device?
- auto device_to_device_stream = device_to_host_stream_;
+ auto device_to_device_stream = stream_;
XlaTensor* xla_src = XlaTensor::FromTensor(&src_tensor);
XlaTensor* xla_dst = XlaTensor::FromTensor(dst_tensor);
CHECK(xla_src && xla_dst)
@@ -254,29 +282,40 @@ void XlaTransferManager::CopyDeviceTensorToDevice(const Tensor& src_tensor,
TF_RETURN_IF_ERROR(
xla_dst->AllocateShapedBuffer(src_tensor.dtype(), shape, client_,
stream_->parent()->device_ordinal()));
+ if (stream_ != device_to_device_stream) {
+ // Initially wait for the compute stream so that memory allocations are
+ // synchronized.
+ device_to_device_stream->ThenWaitFor(stream_);
+ }
}
if (se::Event* event =
xla_src->GetDefinitionEvent(device_to_device_stream)) {
device_to_device_stream->ThenWaitFor(event);
xla_src->SetDefinedOn(device_to_device_stream);
- TF_RETURN_IF_ERROR(device_to_device_stream->BlockHostUntilDone());
}
- TF_RETURN_IF_ERROR(
- xla_dst->shaped_buffer().buffers().ForEachMutableElementWithStatus(
- [&](const xla::ShapeIndex& index, se::DeviceMemoryBase* buffer) {
- const se::DeviceMemoryBase& from_buffer =
- xla_src->shaped_buffer().buffers().element(index);
- CHECK_EQ(buffer->size(), from_buffer.size());
- if (!stream_->parent()->SynchronousMemcpy(buffer, from_buffer,
- buffer->size())) {
- return errors::Internal("Device to device memcpy failed");
- }
- return Status::OK();
- }));
+
+ auto from_iter = xla_src->shaped_buffer().buffers().begin();
+ auto to_iter = xla_dst->shaped_buffer().buffers().begin();
+ for (auto end_iter = xla_src->shaped_buffer().buffers().end();
+ from_iter != end_iter; ++from_iter, ++to_iter) {
+ device_to_device_stream->ThenMemcpyD2D(
+ &to_iter->second, from_iter->second, to_iter->second.size());
+ }
+
+ if (UseMultipleStreams()) {
+ se::Event event(stream_->parent());
+ CHECK(event.Init());
+ device_to_device_stream->ThenRecordEvent(&event);
+ xla_dst->SetDefinedOn(device_to_device_stream, std::move(event));
+ }
return Status::OK();
- };
- done(body());
+ }();
+ if (!status.ok()) {
+ return done(status);
+ } else {
+ stream_->ThenDoHostCallback([=]() { done(Status::OK()); });
+ }
}
XlaDeviceContext::XlaDeviceContext(
diff --git a/tensorflow/compiler/jit/xla_device_context.h b/tensorflow/compiler/jit/xla_device_context.h
index c726495f96..912f8d779e 100644
--- a/tensorflow/compiler/jit/xla_device_context.h
+++ b/tensorflow/compiler/jit/xla_device_context.h
@@ -66,8 +66,9 @@ class XlaTransferManager {
private:
Status TransferLiteralToDevice(const Tensor& host_tensor,
Tensor* device_tensor) const;
- Status TransferLiteralFromDevice(Tensor* host_tensor,
- const Tensor& device_tensor) const;
+ void TransferLiteralFromDevice(Tensor* host_tensor,
+ const Tensor& device_tensor,
+ const StatusCallback& done) const;
bool UseMultipleStreams() const { return stream_ != host_to_device_stream_; }
// The main compute stream of the device, used to synchronize the transfer
diff --git a/tensorflow/compiler/jit/xla_tensor.cc b/tensorflow/compiler/jit/xla_tensor.cc
index 5dff187fff..d777dfa5a3 100644
--- a/tensorflow/compiler/jit/xla_tensor.cc
+++ b/tensorflow/compiler/jit/xla_tensor.cc
@@ -92,10 +92,8 @@ se::Event* XlaTensor::GetDefinitionEvent(se::Stream* stream) {
void XlaTensor::SetDefinedOn(se::Stream* stream, se::Event event) {
mutex_lock lock(mu_);
- CHECK(!definition_event_.has_value())
- << "SetDefinedOn must only be called once!";
definition_event_ = std::move(event);
- streams_defined_on_.push_back(stream);
+ streams_defined_on_ = {stream};
}
void XlaTensor::SetDefinedOn(se::Stream* stream) {
diff --git a/tensorflow/compiler/xla/service/executable.cc b/tensorflow/compiler/xla/service/executable.cc
index 7cf2746947..fd75847d0c 100644
--- a/tensorflow/compiler/xla/service/executable.cc
+++ b/tensorflow/compiler/xla/service/executable.cc
@@ -82,7 +82,18 @@ StatusOr<ScopedShapedBuffer> Executable::ExecuteOnStreamWrapper(
StatusOr<ScopedShapedBuffer> return_value =
ExecuteOnStream(run_options, arguments, profile_ptr.get());
- TF_RETURN_IF_ERROR(return_value.status());
+ if (!return_value.status().ok()) {
+ if (profile != nullptr) {
+ // Ensure the ThenStartTimer call has completed before we destroy timer.
+ // We already have a failure status to return, so just log this if it
+ // fails.
+ Status status = stream->BlockHostUntilDone();
+ if (!status.ok()) {
+ LOG(ERROR) << "Failed to BlockHostUntilDone: " << status;
+ }
+ }
+ return return_value.status();
+ }
if (profile != nullptr) {
VLOG(1) << "enqueueing 'stop timer' and blocking host until done...";
diff --git a/tensorflow/compiler/xla/service/hlo_runner.cc b/tensorflow/compiler/xla/service/hlo_runner.cc
index 4f0569f405..b2725e2918 100644
--- a/tensorflow/compiler/xla/service/hlo_runner.cc
+++ b/tensorflow/compiler/xla/service/hlo_runner.cc
@@ -180,8 +180,12 @@ StatusOr<ScopedShapedBuffer> HloRunner::ExecuteWithDeviceBuffers(
TF_ASSIGN_OR_RETURN(std::unique_ptr<Executable> executable,
CreateExecutable(std::move(module), run_hlo_passes));
- return executable->ExecuteOnStreamWrapper(&service_run_options,
- /*profile=*/profile, arguments);
+ TF_ASSIGN_OR_RETURN(
+ ScopedShapedBuffer retval,
+ executable->ExecuteOnStreamWrapper(&service_run_options,
+ /*profile=*/profile, arguments));
+ TF_RETURN_IF_ERROR(stream.BlockHostUntilDone());
+ return std::move(retval);
}
StatusOr<ScopedShapedBuffer> HloRunner::ExecuteWithDeviceBuffers(
@@ -309,6 +313,7 @@ StatusOr<std::vector<std::unique_ptr<Literal>>> HloRunner::ExecuteReplicated(
std::vector<std::unique_ptr<Literal>> exec_results;
for (int64 i = 0; i < options.num_replicas; ++i) {
+ TF_RETURN_IF_ERROR(streams[i]->BlockHostUntilDone());
TF_ASSIGN_OR_RETURN(std::unique_ptr<Literal> literal,
backend().transfer_manager()->TransferLiteralFromDevice(
streams[i].get(), results[i]));
diff --git a/tensorflow/compiler/xla/tests/local_client_execute_test.cc b/tensorflow/compiler/xla/tests/local_client_execute_test.cc
index 2f4d197ae6..5c3498c84c 100644
--- a/tensorflow/compiler/xla/tests/local_client_execute_test.cc
+++ b/tensorflow/compiler/xla/tests/local_client_execute_test.cc
@@ -772,6 +772,10 @@ XLA_TEST_F(LocalClientExecuteTest, CompileExecutable) {
ScopedShapedBuffer result =
executable->Run({&x_array}, DefaultExecutableRunOptions())
.ConsumeValueOrDie();
+ ASSERT_IS_OK(local_client_->mutable_backend()
+ ->BorrowStream(0)
+ .ValueOrDie()
+ ->BlockHostUntilDone());
LiteralTestUtil::ExpectR1Near<float>(
{2.0f, 4.0f, 6.0f}, *ShapedBufferToLiteral(result), error_spec_);
diff --git a/tensorflow/compiler/xla/tests/local_client_test_base.cc b/tensorflow/compiler/xla/tests/local_client_test_base.cc
index 88797a7d0a..c31ba0e713 100644
--- a/tensorflow/compiler/xla/tests/local_client_test_base.cc
+++ b/tensorflow/compiler/xla/tests/local_client_test_base.cc
@@ -189,7 +189,19 @@ StatusOr<ScopedShapedBuffer> LocalClientTestBase::ExecuteLocally(
TF_ASSIGN_OR_RETURN(
std::unique_ptr<LocalExecutable> executable,
local_client_->Compile(computation, argument_layouts, build_options));
- return executable->Run(arguments, run_options);
+ TF_ASSIGN_OR_RETURN(auto ret, executable->Run(arguments, run_options));
+
+ auto device_ordinal =
+ build_options.device_ordinal() == -1 ? 0 : build_options.device_ordinal();
+ auto* stream = run_options.stream();
+ if (!stream) {
+ stream = local_client_->mutable_backend()
+ ->BorrowStream(device_ordinal)
+ .ValueOrDie()
+ .get();
+ }
+ TF_RETURN_IF_ERROR(stream->BlockHostUntilDone());
+ return std::move(ret);
}
} // namespace xla
diff --git a/tensorflow/compiler/xla/tests/xla_hlo_profile_test.cc b/tensorflow/compiler/xla/tests/xla_hlo_profile_test.cc
index 4d4dd62a3f..c000ff4dc8 100644
--- a/tensorflow/compiler/xla/tests/xla_hlo_profile_test.cc
+++ b/tensorflow/compiler/xla/tests/xla_hlo_profile_test.cc
@@ -172,6 +172,7 @@ void ExecuteAndFetchProfile(string* profile_output, LocalClient* client,
auto execution_result,
executable->ExecuteOnStream(&run_options, {&lhs_arg, &rhs_arg},
&hlo_execution_profile));
+ TF_ASSERT_OK(stream_ptr->BlockHostUntilDone());
(void)execution_result;
*profile_output =
diff --git a/tensorflow/stream_executor/host/host_gpu_executor.cc b/tensorflow/stream_executor/host/host_gpu_executor.cc
index 3cd97b3cf1..8adf739b17 100644
--- a/tensorflow/stream_executor/host/host_gpu_executor.cc
+++ b/tensorflow/stream_executor/host/host_gpu_executor.cc
@@ -93,7 +93,7 @@ bool HostExecutor::MemcpyDeviceToDevice(Stream *stream,
// the nature of the HostExecutor) memcpy on the stream (HostStream)
// associated with the HostExecutor.
AsHostStream(stream)->EnqueueTask(
- [src_mem, dst_mem, size]() { memcpy(src_mem, dst_mem, size); });
+ [src_mem, dst_mem, size]() { memcpy(dst_mem, src_mem, size); });
return true;
}
diff --git a/tensorflow/stream_executor/stream.cc b/tensorflow/stream_executor/stream.cc
index ca1b8e28e6..2c495c99e1 100644
--- a/tensorflow/stream_executor/stream.cc
+++ b/tensorflow/stream_executor/stream.cc
@@ -268,6 +268,12 @@ Stream::~Stream() {
VLOG_CALL();
temporary_memory_manager_.ForceDeallocateAll();
+ // Ensure the stream is completed.
+ auto status = BlockHostUntilDone();
+ if (!status.ok()) {
+ LOG(WARNING) << "Error blocking host until done in stream destructor: "
+ << status;
+ }
if (allocated_) {
parent_->DeallocateStream(this);