aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar A. Unique TensorFlower <gardener@tensorflow.org>2017-06-14 12:30:57 -0700
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2017-06-14 12:34:37 -0700
commit1856fe120d015fb17be303ef2da3873a10a6ffa6 (patch)
treec9237be41bf5b540457fa4efcf08b01a2b84b1b5
parent7e3e768d264fdbfd8ce9dbafe0ea08c671c8f7f0 (diff)
InfeedTuple support for GPU backend.
PiperOrigin-RevId: 159009451
-rw-r--r--tensorflow/compiler/xla/service/gpu/infeed_manager.cc39
-rw-r--r--tensorflow/compiler/xla/service/gpu/infeed_manager.h36
-rw-r--r--tensorflow/compiler/xla/service/gpu/infeed_thunk.cc52
-rw-r--r--tensorflow/compiler/xla/service/gpu/infeed_thunk.h8
-rw-r--r--tensorflow/compiler/xla/service/gpu/ir_emitter_unnested.cc16
-rw-r--r--tensorflow/compiler/xla/service/gpu_transfer_manager.cc64
-rw-r--r--tensorflow/compiler/xla/service/gpu_transfer_manager.h6
7 files changed, 153 insertions, 68 deletions
diff --git a/tensorflow/compiler/xla/service/gpu/infeed_manager.cc b/tensorflow/compiler/xla/service/gpu/infeed_manager.cc
index 120a3f7fba..8b948d89f5 100644
--- a/tensorflow/compiler/xla/service/gpu/infeed_manager.cc
+++ b/tensorflow/compiler/xla/service/gpu/infeed_manager.cc
@@ -13,8 +13,10 @@ See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
-#include "tensorflow/compiler/xla/ptr_util.h"
#include "tensorflow/compiler/xla/service/gpu/infeed_manager.h"
+
+#include "tensorflow/compiler/xla/map_util.h"
+#include "tensorflow/compiler/xla/ptr_util.h"
#include "tensorflow/core/platform/logging.h"
namespace se = ::perftools::gputools;
@@ -22,23 +24,23 @@ namespace se = ::perftools::gputools;
namespace xla {
namespace gpu {
-InfeedManager::InfeedManager()
- : current_buffer_(nullptr),
- host_to_device_executor_(nullptr) {}
+InfeedManager::InfeedManager() : host_to_device_executor_(nullptr) {}
void InfeedManager::Reset() {
tensorflow::mutex_lock l(mu_);
- CHECK(!current_buffer_);
+ CHECK(dequeued_buffer_.empty());
for (auto buffer : enqueued_buffer_) {
buffer->Done();
}
enqueued_buffer_.clear();
}
-void InfeedManager::EnqueueBuffer(InfeedBuffer* buffer) {
+void InfeedManager::EnqueueBuffers(std::vector<InfeedBuffer*> buffers) {
tensorflow::mutex_lock l(mu_);
bool was_empty = enqueued_buffer_.empty();
- enqueued_buffer_.push_back(buffer);
+ for (gpu::InfeedBuffer* b : buffers) {
+ enqueued_buffer_.push_back(b);
+ }
if (was_empty) {
// This has the potential to suffer from the notified thread
// immediately trying and failing to acquire mu_, but seems
@@ -53,18 +55,23 @@ InfeedBuffer* InfeedManager::BlockingDequeueBuffer() {
while (enqueued_buffer_.empty()) {
cv_.wait(l);
}
- CHECK(!current_buffer_);
- current_buffer_ = enqueued_buffer_.front();
+ InfeedBuffer* current_buffer = enqueued_buffer_.front();
enqueued_buffer_.pop_front();
- return current_buffer_;
+ dequeued_buffer_.insert(current_buffer);
+ return current_buffer;
}
-void InfeedManager::ReleaseCurrentBuffer(se::DeviceMemoryBase* device_memory) {
- tensorflow::mutex_lock l(mu_);
- CHECK(current_buffer_);
- CHECK(device_memory->IsSameAs(*current_buffer_->device_memory()));
- current_buffer_->Done();
- current_buffer_ = nullptr;
+void InfeedManager::ReleaseBuffers(std::vector<InfeedBuffer*> buffers) {
+ {
+ tensorflow::mutex_lock l(mu_);
+ for (gpu::InfeedBuffer* b : buffers) {
+ CHECK(ContainsKey(dequeued_buffer_, b));
+ dequeued_buffer_.erase(b);
+ }
+ }
+ for (gpu::InfeedBuffer* b : buffers) {
+ b->Done();
+ }
}
se::Stream* InfeedManager::GetStream(se::StreamExecutor* executor) {
diff --git a/tensorflow/compiler/xla/service/gpu/infeed_manager.h b/tensorflow/compiler/xla/service/gpu/infeed_manager.h
index 50d0ce340f..23fbc5bd08 100644
--- a/tensorflow/compiler/xla/service/gpu/infeed_manager.h
+++ b/tensorflow/compiler/xla/service/gpu/infeed_manager.h
@@ -23,6 +23,7 @@ limitations under the License.
#include <deque>
#include "tensorflow/compiler/xla/types.h"
+#include "tensorflow/core/lib/gtl/flatset.h"
#include "tensorflow/core/platform/mutex.h"
#include "tensorflow/core/platform/stream_executor_no_cuda.h"
@@ -81,25 +82,19 @@ class InfeedManager {
// condition is to call Reset when no computation is taking place.
void Reset();
- // Adds buffer to the infeed queue. buffer->Done will be called when
- // the buffer will no longer be accessed by the InfeedManager,
- // either as a result of a call to Reset or because the runtime has
- // dequeued and used the buffer.
- void EnqueueBuffer(InfeedBuffer* buffer);
+ // Adds a set of buffers to the infeed queue atomically. buffer->Done
+ // will be called when the buffer will no longer be accessed by the
+ // InfeedManager, either as a result of a call to Reset or because the
+ // runtime has dequeued and used the buffer.
+ void EnqueueBuffers(std::vector<InfeedBuffer*> buffers);
// Blocks until the infeed queue is non-empty, then returns the
- // buffer at the head of the queue. Sets the current buffer to be
- // the returned buffer. It is an error to call BlockingDequeueBuffer
- // if there is an unreleased current buffer, i.e.,
- // ReleaseCurrentBuffer must be called between calls to
- // BlockingDequeueBuffer.
+ // buffer at the head of the queue. Adds the current buffer to the
+ // to-be released set.
InfeedBuffer* BlockingDequeueBuffer();
- // Releases the current buffer, which is the last buffer returned by
- // BlockingDequeueBuffer and not yet released. device_memory must
- // match that of the current buffer.
- void ReleaseCurrentBuffer(
- perftools::gputools::DeviceMemoryBase* device_memory);
+ // Releases a set of buffers from the to-be released set.
+ void ReleaseBuffers(std::vector<InfeedBuffer*> buffers);
// Returns a cached stream associated with an executor. Allocates a
// new stream on the first invocation. On subsequent invocations, if
@@ -109,18 +104,25 @@ class InfeedManager {
perftools::gputools::StreamExecutor* executor);
private:
+ // TODO(b/30467474): Revisit if this mutex becomes a point of
+ // contention.
tensorflow::mutex mu_;
+
// Condition variable that is signaled every time a buffer is
// enqueued to an empty queue.
tensorflow::condition_variable cv_;
+
// InfeedBuffer* queue contents are not owned, but buffer->Done must
// be called when the buffer is no longer needed by the runtime.
std::deque<InfeedBuffer*> enqueued_buffer_;
- // If non-NULL, the buffer that is currently being processed by the
+
+ // Buffers that are dequeued and currently being processed by the
// runtime. Not owned.
- InfeedBuffer* current_buffer_;
+ tensorflow::gtl::FlatSet<const InfeedBuffer*> dequeued_buffer_;
+
// Cached host to device stream for queuing infeed data.
std::unique_ptr<perftools::gputools::Stream> host_to_device_stream_;
+
// Executor that the host_to_device_stream belongs to. Not owned.
perftools::gputools::StreamExecutor* host_to_device_executor_;
};
diff --git a/tensorflow/compiler/xla/service/gpu/infeed_thunk.cc b/tensorflow/compiler/xla/service/gpu/infeed_thunk.cc
index 6f144c7273..e33e904692 100644
--- a/tensorflow/compiler/xla/service/gpu/infeed_thunk.cc
+++ b/tensorflow/compiler/xla/service/gpu/infeed_thunk.cc
@@ -21,31 +21,59 @@ limitations under the License.
namespace xla {
namespace gpu {
-InfeedThunk::InfeedThunk(const BufferAllocation::Slice& destination_buffer,
- uint64 mem_size, const HloInstruction* hlo_instruction)
+InfeedThunk::InfeedThunk(
+ tensorflow::gtl::ArraySlice<BufferAllocation::Slice> tuple_element_buffers,
+ const BufferAllocation::Slice& destination_buffer,
+ const HloInstruction* hlo_instruction)
: Thunk(Kind::kInfeed, hlo_instruction),
- destination_buffer_(destination_buffer),
- mem_size_(mem_size) {}
+ tuple_element_buffers_(tuple_element_buffers.begin(),
+ tuple_element_buffers.end()),
+ destination_buffer_(destination_buffer) {}
tensorflow::Status InfeedThunk::ExecuteOnStream(
const BufferAllocations& buffer_allocations,
perftools::gputools::Stream* stream) {
VLOG(2) << "Infeeding to GPU ";
- perftools::gputools::DeviceMemoryBase destination_data =
+
+ perftools::gputools::DeviceMemoryBase destination_address =
buffer_allocations.GetDeviceAddress(destination_buffer_);
InfeedManager* infeed_manager = GetOrCreateInfeedManager();
- InfeedBuffer* buffer = infeed_manager->BlockingDequeueBuffer();
- CHECK_EQ(buffer->length(), mem_size_);
- stream->ThenMemcpy(&destination_data, *(buffer->device_memory()),
- buffer->length());
+ std::vector<InfeedBuffer*> infeed_buffers;
+ if (ShapeUtil::IsTuple(hlo_instruction()->shape())) {
+ CHECK(!ShapeUtil::IsNestedTuple(hlo_instruction()->shape()));
+ // Transfer the tuple elements first.
+ std::vector<void*> tuple_element_addresses;
+ for (BufferAllocation::Slice tuple_element_buffer :
+ tuple_element_buffers_) {
+ perftools::gputools::DeviceMemoryBase tuple_element_address =
+ buffer_allocations.GetDeviceAddress(tuple_element_buffer);
+
+ InfeedBuffer* buffer = infeed_manager->BlockingDequeueBuffer();
+ infeed_buffers.push_back(buffer);
+ stream->ThenMemcpy(&tuple_element_address, *(buffer->device_memory()),
+ buffer->length());
+ tuple_element_addresses.push_back(tuple_element_address.opaque());
+ }
+ // Transfer the tuple outer buffer.
+ auto host_size = tuple_element_addresses.size() * sizeof(void*);
+ stream->ThenMemcpy(&destination_address, tuple_element_addresses.data(),
+ host_size);
+ } else {
+ InfeedBuffer* buffer = infeed_manager->BlockingDequeueBuffer();
+ infeed_buffers.push_back(buffer);
+ stream->ThenMemcpy(&destination_address, *(buffer->device_memory()),
+ buffer->length());
+ }
+
if (!stream->BlockHostUntilDone()) {
return InternalError("Failed to complete data transfer on stream %p",
stream);
}
- // Since Infeeds are totally ordered, no other infeed should sneak
- // in and we should be able to release the same buffer we dequeued.
- infeed_manager->ReleaseCurrentBuffer(buffer->device_memory());
+
+ infeed_manager->ReleaseBuffers(infeed_buffers);
+
+ VLOG(2) << "Infeeding to GPU complete";
return tensorflow::Status::OK();
}
diff --git a/tensorflow/compiler/xla/service/gpu/infeed_thunk.h b/tensorflow/compiler/xla/service/gpu/infeed_thunk.h
index 0a808186c2..371d71f9db 100644
--- a/tensorflow/compiler/xla/service/gpu/infeed_thunk.h
+++ b/tensorflow/compiler/xla/service/gpu/infeed_thunk.h
@@ -35,8 +35,10 @@ class InfeedThunk : public Thunk {
// infeed queue to the device buffer
// `destination_buffer`. `mem_size` is the size of the data in
// bytes.
- InfeedThunk(const BufferAllocation::Slice& destination_buffer,
- uint64 mem_size, const HloInstruction* hlo_instruction);
+ InfeedThunk(tensorflow::gtl::ArraySlice<BufferAllocation::Slice>
+ tuple_element_buffers,
+ const BufferAllocation::Slice& destination_buffer,
+ const HloInstruction* hlo_instruction);
InfeedThunk(const InfeedThunk&) = delete;
InfeedThunk& operator=(const InfeedThunk&) = delete;
@@ -46,8 +48,8 @@ class InfeedThunk : public Thunk {
perftools::gputools::Stream* stream) override;
private:
+ const std::vector<BufferAllocation::Slice> tuple_element_buffers_;
const BufferAllocation::Slice destination_buffer_;
- const uint64 mem_size_;
};
} // namespace gpu
diff --git a/tensorflow/compiler/xla/service/gpu/ir_emitter_unnested.cc b/tensorflow/compiler/xla/service/gpu/ir_emitter_unnested.cc
index 5fa2bfdd7e..e44b645342 100644
--- a/tensorflow/compiler/xla/service/gpu/ir_emitter_unnested.cc
+++ b/tensorflow/compiler/xla/service/gpu/ir_emitter_unnested.cc
@@ -1659,12 +1659,18 @@ std::unique_ptr<Thunk> IrEmitterUnnested::BuildCopyThunk(
std::unique_ptr<Thunk> IrEmitterUnnested::BuildInfeedThunk(
const HloInstruction* inst) {
CHECK_EQ(HloOpcode::kInfeed, inst->opcode());
+
+ std::vector<BufferAllocation::Slice> tuple_element_buffers;
+ for (int64 i = 0; i < inst->shape().tuple_shapes_size(); ++i) {
+ BufferAllocation::Slice buffer = ir_emitter_context_->buffer_assignment()
+ .GetUniqueSlice(inst, {i})
+ .ConsumeValueOrDie();
+ tuple_element_buffers.push_back(buffer);
+ }
+
return MakeUnique<InfeedThunk>(
- /*destination_buffer=*/GetAllocationSlice(*inst),
- /*mem_size=*/
- llvm_ir::ByteSizeOf(inst->shape(),
- ir_emitter_context_->llvm_module()->getDataLayout()),
- inst);
+ tuple_element_buffers,
+ /*destination_buffer=*/GetAllocationSlice(*inst), inst);
}
std::unique_ptr<Thunk> IrEmitterUnnested::BuildGemmThunk(
diff --git a/tensorflow/compiler/xla/service/gpu_transfer_manager.cc b/tensorflow/compiler/xla/service/gpu_transfer_manager.cc
index 4b8d190a46..4971de74ae 100644
--- a/tensorflow/compiler/xla/service/gpu_transfer_manager.cc
+++ b/tensorflow/compiler/xla/service/gpu_transfer_manager.cc
@@ -20,7 +20,6 @@ limitations under the License.
#include <vector>
#include "tensorflow/compiler/xla/literal_util.h"
-#include "tensorflow/compiler/xla/service/gpu/infeed_manager.h"
#include "tensorflow/compiler/xla/shape_util.h"
#include "tensorflow/compiler/xla/status_macros.h"
#include "tensorflow/compiler/xla/statusor.h"
@@ -44,16 +43,60 @@ GpuTransferManager::GpuTransferManager()
Status GpuTransferManager::TransferLiteralToInfeed(se::StreamExecutor* executor,
const Literal& literal) {
const Shape& shape = literal.shape();
- VLOG(2) << "Transferring literal shape to infeed: "
+ VLOG(2) << "Transferring literal to infeed with shape: "
<< ShapeUtil::HumanString(shape);
- // TODO(b/30467474) handle tuples.
+ std::vector<gpu::InfeedBuffer*> buffers;
+
if (ShapeUtil::IsTuple(shape)) {
- return Unimplemented("Infeed with a tuple shape is not supported: %s",
- ShapeUtil::HumanString(literal.shape()).c_str());
+ if (ShapeUtil::IsNestedTuple(shape)) {
+ return Unimplemented(
+ "Infeed with a nested tuple shape is not supported: %s",
+ ShapeUtil::HumanString(literal.shape()).c_str());
+ }
+
+ for (const auto& tuple_element : literal.tuple_literals()) {
+ TF_ASSIGN_OR_RETURN(
+ gpu::InfeedBuffer * buffer,
+ TransferLiteralToInfeedInternal(executor, tuple_element));
+ buffers.push_back(buffer);
+ }
+ } else {
+ TF_ASSIGN_OR_RETURN(gpu::InfeedBuffer * buffer,
+ TransferLiteralToInfeedInternal(executor, literal));
+ buffers.push_back(buffer);
+ }
+
+ gpu::InfeedManager* infeed_manager = gpu::GetOrCreateInfeedManager();
+ se::Stream* stream = infeed_manager->GetStream(executor);
+
+ // TODO(b/30467474): Since this stream is shared across different
+ // infeed requests, blocking on the stream might be
+ // heavy-handed. Figure out if finer-grained acknowledgement is
+ // possible.
+ if (!stream->BlockHostUntilDone()) {
+ for (gpu::InfeedBuffer* b : buffers) {
+ b->Done();
+ }
+ return InternalError("Failed to complete data transfer on stream %p",
+ stream);
}
+ infeed_manager->EnqueueBuffers(buffers);
+
+ VLOG(2) << "Infeed data transferred";
+
+ return Status::OK();
+}
+
+StatusOr<gpu::InfeedBuffer*>
+GpuTransferManager::TransferLiteralToInfeedInternal(
+ se::StreamExecutor* executor, const Literal& literal) {
+ const Shape& shape = literal.shape();
+ CHECK(!ShapeUtil::IsTuple(shape));
+
int64 size = GetByteSizeRequirement(shape);
+
if (size > std::numeric_limits<int32>::max()) {
return Unimplemented("Infeed shape is too large: %s needs %lld bytes",
ShapeUtil::HumanString(literal.shape()).c_str(), size);
@@ -76,16 +119,7 @@ Status GpuTransferManager::TransferLiteralToInfeed(se::StreamExecutor* executor,
VLOG(2) << "Queued infeed data on stream " << stream;
- if (!stream->BlockHostUntilDone()) {
- buffer->Done();
- return InternalError("Failed to complete data transfer on stream %p",
- stream);
- }
-
- infeed_manager->EnqueueBuffer(buffer);
-
- VLOG(2) << "Infeed data transferred";
- return Status::OK();
+ return buffer;
}
} // namespace xla
diff --git a/tensorflow/compiler/xla/service/gpu_transfer_manager.h b/tensorflow/compiler/xla/service/gpu_transfer_manager.h
index 6dfe7ba029..6b736b9b9a 100644
--- a/tensorflow/compiler/xla/service/gpu_transfer_manager.h
+++ b/tensorflow/compiler/xla/service/gpu_transfer_manager.h
@@ -19,6 +19,7 @@ limitations under the License.
#include <vector>
#include "tensorflow/compiler/xla/service/generic_transfer_manager.h"
+#include "tensorflow/compiler/xla/service/gpu/infeed_manager.h"
#include "tensorflow/compiler/xla/service/transfer_manager.h"
#include "tensorflow/compiler/xla/statusor.h"
#include "tensorflow/compiler/xla/xla_data.pb.h"
@@ -39,6 +40,11 @@ class GpuTransferManager : public GenericTransferManager {
const Literal& literal) override;
private:
+ // Internal helper function for TransferLiteralToInfeed(). Input
+ // literal cannot be a tuple.
+ StatusOr<gpu::InfeedBuffer*> TransferLiteralToInfeedInternal(
+ perftools::gputools::StreamExecutor* executor, const Literal& literal);
+
TF_DISALLOW_COPY_AND_ASSIGN(GpuTransferManager);
};