diff options
author | 2017-06-14 12:30:57 -0700 | |
---|---|---|
committer | 2017-06-14 12:34:37 -0700 | |
commit | 1856fe120d015fb17be303ef2da3873a10a6ffa6 (patch) | |
tree | c9237be41bf5b540457fa4efcf08b01a2b84b1b5 | |
parent | 7e3e768d264fdbfd8ce9dbafe0ea08c671c8f7f0 (diff) |
InfeedTuple support for GPU backend.
PiperOrigin-RevId: 159009451
7 files changed, 153 insertions, 68 deletions
diff --git a/tensorflow/compiler/xla/service/gpu/infeed_manager.cc b/tensorflow/compiler/xla/service/gpu/infeed_manager.cc index 120a3f7fba..8b948d89f5 100644 --- a/tensorflow/compiler/xla/service/gpu/infeed_manager.cc +++ b/tensorflow/compiler/xla/service/gpu/infeed_manager.cc @@ -13,8 +13,10 @@ See the License for the specific language governing permissions and limitations under the License. ==============================================================================*/ -#include "tensorflow/compiler/xla/ptr_util.h" #include "tensorflow/compiler/xla/service/gpu/infeed_manager.h" + +#include "tensorflow/compiler/xla/map_util.h" +#include "tensorflow/compiler/xla/ptr_util.h" #include "tensorflow/core/platform/logging.h" namespace se = ::perftools::gputools; @@ -22,23 +24,23 @@ namespace se = ::perftools::gputools; namespace xla { namespace gpu { -InfeedManager::InfeedManager() - : current_buffer_(nullptr), - host_to_device_executor_(nullptr) {} +InfeedManager::InfeedManager() : host_to_device_executor_(nullptr) {} void InfeedManager::Reset() { tensorflow::mutex_lock l(mu_); - CHECK(!current_buffer_); + CHECK(dequeued_buffer_.empty()); for (auto buffer : enqueued_buffer_) { buffer->Done(); } enqueued_buffer_.clear(); } -void InfeedManager::EnqueueBuffer(InfeedBuffer* buffer) { +void InfeedManager::EnqueueBuffers(std::vector<InfeedBuffer*> buffers) { tensorflow::mutex_lock l(mu_); bool was_empty = enqueued_buffer_.empty(); - enqueued_buffer_.push_back(buffer); + for (gpu::InfeedBuffer* b : buffers) { + enqueued_buffer_.push_back(b); + } if (was_empty) { // This has the potential to suffer from the notified thread // immediately trying and failing to acquire mu_, but seems @@ -53,18 +55,23 @@ InfeedBuffer* InfeedManager::BlockingDequeueBuffer() { while (enqueued_buffer_.empty()) { cv_.wait(l); } - CHECK(!current_buffer_); - current_buffer_ = enqueued_buffer_.front(); + InfeedBuffer* current_buffer = enqueued_buffer_.front(); enqueued_buffer_.pop_front(); - return current_buffer_; + dequeued_buffer_.insert(current_buffer); + return current_buffer; } -void InfeedManager::ReleaseCurrentBuffer(se::DeviceMemoryBase* device_memory) { - tensorflow::mutex_lock l(mu_); - CHECK(current_buffer_); - CHECK(device_memory->IsSameAs(*current_buffer_->device_memory())); - current_buffer_->Done(); - current_buffer_ = nullptr; +void InfeedManager::ReleaseBuffers(std::vector<InfeedBuffer*> buffers) { + { + tensorflow::mutex_lock l(mu_); + for (gpu::InfeedBuffer* b : buffers) { + CHECK(ContainsKey(dequeued_buffer_, b)); + dequeued_buffer_.erase(b); + } + } + for (gpu::InfeedBuffer* b : buffers) { + b->Done(); + } } se::Stream* InfeedManager::GetStream(se::StreamExecutor* executor) { diff --git a/tensorflow/compiler/xla/service/gpu/infeed_manager.h b/tensorflow/compiler/xla/service/gpu/infeed_manager.h index 50d0ce340f..23fbc5bd08 100644 --- a/tensorflow/compiler/xla/service/gpu/infeed_manager.h +++ b/tensorflow/compiler/xla/service/gpu/infeed_manager.h @@ -23,6 +23,7 @@ limitations under the License. #include <deque> #include "tensorflow/compiler/xla/types.h" +#include "tensorflow/core/lib/gtl/flatset.h" #include "tensorflow/core/platform/mutex.h" #include "tensorflow/core/platform/stream_executor_no_cuda.h" @@ -81,25 +82,19 @@ class InfeedManager { // condition is to call Reset when no computation is taking place. void Reset(); - // Adds buffer to the infeed queue. buffer->Done will be called when - // the buffer will no longer be accessed by the InfeedManager, - // either as a result of a call to Reset or because the runtime has - // dequeued and used the buffer. - void EnqueueBuffer(InfeedBuffer* buffer); + // Adds a set of buffers to the infeed queue atomically. buffer->Done + // will be called when the buffer will no longer be accessed by the + // InfeedManager, either as a result of a call to Reset or because the + // runtime has dequeued and used the buffer. + void EnqueueBuffers(std::vector<InfeedBuffer*> buffers); // Blocks until the infeed queue is non-empty, then returns the - // buffer at the head of the queue. Sets the current buffer to be - // the returned buffer. It is an error to call BlockingDequeueBuffer - // if there is an unreleased current buffer, i.e., - // ReleaseCurrentBuffer must be called between calls to - // BlockingDequeueBuffer. + // buffer at the head of the queue. Adds the current buffer to the + // to-be released set. InfeedBuffer* BlockingDequeueBuffer(); - // Releases the current buffer, which is the last buffer returned by - // BlockingDequeueBuffer and not yet released. device_memory must - // match that of the current buffer. - void ReleaseCurrentBuffer( - perftools::gputools::DeviceMemoryBase* device_memory); + // Releases a set of buffers from the to-be released set. + void ReleaseBuffers(std::vector<InfeedBuffer*> buffers); // Returns a cached stream associated with an executor. Allocates a // new stream on the first invocation. On subsequent invocations, if @@ -109,18 +104,25 @@ class InfeedManager { perftools::gputools::StreamExecutor* executor); private: + // TODO(b/30467474): Revisit if this mutex becomes a point of + // contention. tensorflow::mutex mu_; + // Condition variable that is signaled every time a buffer is // enqueued to an empty queue. tensorflow::condition_variable cv_; + // InfeedBuffer* queue contents are not owned, but buffer->Done must // be called when the buffer is no longer needed by the runtime. std::deque<InfeedBuffer*> enqueued_buffer_; - // If non-NULL, the buffer that is currently being processed by the + + // Buffers that are dequeued and currently being processed by the // runtime. Not owned. - InfeedBuffer* current_buffer_; + tensorflow::gtl::FlatSet<const InfeedBuffer*> dequeued_buffer_; + // Cached host to device stream for queuing infeed data. std::unique_ptr<perftools::gputools::Stream> host_to_device_stream_; + // Executor that the host_to_device_stream belongs to. Not owned. perftools::gputools::StreamExecutor* host_to_device_executor_; }; diff --git a/tensorflow/compiler/xla/service/gpu/infeed_thunk.cc b/tensorflow/compiler/xla/service/gpu/infeed_thunk.cc index 6f144c7273..e33e904692 100644 --- a/tensorflow/compiler/xla/service/gpu/infeed_thunk.cc +++ b/tensorflow/compiler/xla/service/gpu/infeed_thunk.cc @@ -21,31 +21,59 @@ limitations under the License. namespace xla { namespace gpu { -InfeedThunk::InfeedThunk(const BufferAllocation::Slice& destination_buffer, - uint64 mem_size, const HloInstruction* hlo_instruction) +InfeedThunk::InfeedThunk( + tensorflow::gtl::ArraySlice<BufferAllocation::Slice> tuple_element_buffers, + const BufferAllocation::Slice& destination_buffer, + const HloInstruction* hlo_instruction) : Thunk(Kind::kInfeed, hlo_instruction), - destination_buffer_(destination_buffer), - mem_size_(mem_size) {} + tuple_element_buffers_(tuple_element_buffers.begin(), + tuple_element_buffers.end()), + destination_buffer_(destination_buffer) {} tensorflow::Status InfeedThunk::ExecuteOnStream( const BufferAllocations& buffer_allocations, perftools::gputools::Stream* stream) { VLOG(2) << "Infeeding to GPU "; - perftools::gputools::DeviceMemoryBase destination_data = + + perftools::gputools::DeviceMemoryBase destination_address = buffer_allocations.GetDeviceAddress(destination_buffer_); InfeedManager* infeed_manager = GetOrCreateInfeedManager(); - InfeedBuffer* buffer = infeed_manager->BlockingDequeueBuffer(); - CHECK_EQ(buffer->length(), mem_size_); - stream->ThenMemcpy(&destination_data, *(buffer->device_memory()), - buffer->length()); + std::vector<InfeedBuffer*> infeed_buffers; + if (ShapeUtil::IsTuple(hlo_instruction()->shape())) { + CHECK(!ShapeUtil::IsNestedTuple(hlo_instruction()->shape())); + // Transfer the tuple elements first. + std::vector<void*> tuple_element_addresses; + for (BufferAllocation::Slice tuple_element_buffer : + tuple_element_buffers_) { + perftools::gputools::DeviceMemoryBase tuple_element_address = + buffer_allocations.GetDeviceAddress(tuple_element_buffer); + + InfeedBuffer* buffer = infeed_manager->BlockingDequeueBuffer(); + infeed_buffers.push_back(buffer); + stream->ThenMemcpy(&tuple_element_address, *(buffer->device_memory()), + buffer->length()); + tuple_element_addresses.push_back(tuple_element_address.opaque()); + } + // Transfer the tuple outer buffer. + auto host_size = tuple_element_addresses.size() * sizeof(void*); + stream->ThenMemcpy(&destination_address, tuple_element_addresses.data(), + host_size); + } else { + InfeedBuffer* buffer = infeed_manager->BlockingDequeueBuffer(); + infeed_buffers.push_back(buffer); + stream->ThenMemcpy(&destination_address, *(buffer->device_memory()), + buffer->length()); + } + if (!stream->BlockHostUntilDone()) { return InternalError("Failed to complete data transfer on stream %p", stream); } - // Since Infeeds are totally ordered, no other infeed should sneak - // in and we should be able to release the same buffer we dequeued. - infeed_manager->ReleaseCurrentBuffer(buffer->device_memory()); + + infeed_manager->ReleaseBuffers(infeed_buffers); + + VLOG(2) << "Infeeding to GPU complete"; return tensorflow::Status::OK(); } diff --git a/tensorflow/compiler/xla/service/gpu/infeed_thunk.h b/tensorflow/compiler/xla/service/gpu/infeed_thunk.h index 0a808186c2..371d71f9db 100644 --- a/tensorflow/compiler/xla/service/gpu/infeed_thunk.h +++ b/tensorflow/compiler/xla/service/gpu/infeed_thunk.h @@ -35,8 +35,10 @@ class InfeedThunk : public Thunk { // infeed queue to the device buffer // `destination_buffer`. `mem_size` is the size of the data in // bytes. - InfeedThunk(const BufferAllocation::Slice& destination_buffer, - uint64 mem_size, const HloInstruction* hlo_instruction); + InfeedThunk(tensorflow::gtl::ArraySlice<BufferAllocation::Slice> + tuple_element_buffers, + const BufferAllocation::Slice& destination_buffer, + const HloInstruction* hlo_instruction); InfeedThunk(const InfeedThunk&) = delete; InfeedThunk& operator=(const InfeedThunk&) = delete; @@ -46,8 +48,8 @@ class InfeedThunk : public Thunk { perftools::gputools::Stream* stream) override; private: + const std::vector<BufferAllocation::Slice> tuple_element_buffers_; const BufferAllocation::Slice destination_buffer_; - const uint64 mem_size_; }; } // namespace gpu diff --git a/tensorflow/compiler/xla/service/gpu/ir_emitter_unnested.cc b/tensorflow/compiler/xla/service/gpu/ir_emitter_unnested.cc index 5fa2bfdd7e..e44b645342 100644 --- a/tensorflow/compiler/xla/service/gpu/ir_emitter_unnested.cc +++ b/tensorflow/compiler/xla/service/gpu/ir_emitter_unnested.cc @@ -1659,12 +1659,18 @@ std::unique_ptr<Thunk> IrEmitterUnnested::BuildCopyThunk( std::unique_ptr<Thunk> IrEmitterUnnested::BuildInfeedThunk( const HloInstruction* inst) { CHECK_EQ(HloOpcode::kInfeed, inst->opcode()); + + std::vector<BufferAllocation::Slice> tuple_element_buffers; + for (int64 i = 0; i < inst->shape().tuple_shapes_size(); ++i) { + BufferAllocation::Slice buffer = ir_emitter_context_->buffer_assignment() + .GetUniqueSlice(inst, {i}) + .ConsumeValueOrDie(); + tuple_element_buffers.push_back(buffer); + } + return MakeUnique<InfeedThunk>( - /*destination_buffer=*/GetAllocationSlice(*inst), - /*mem_size=*/ - llvm_ir::ByteSizeOf(inst->shape(), - ir_emitter_context_->llvm_module()->getDataLayout()), - inst); + tuple_element_buffers, + /*destination_buffer=*/GetAllocationSlice(*inst), inst); } std::unique_ptr<Thunk> IrEmitterUnnested::BuildGemmThunk( diff --git a/tensorflow/compiler/xla/service/gpu_transfer_manager.cc b/tensorflow/compiler/xla/service/gpu_transfer_manager.cc index 4b8d190a46..4971de74ae 100644 --- a/tensorflow/compiler/xla/service/gpu_transfer_manager.cc +++ b/tensorflow/compiler/xla/service/gpu_transfer_manager.cc @@ -20,7 +20,6 @@ limitations under the License. #include <vector> #include "tensorflow/compiler/xla/literal_util.h" -#include "tensorflow/compiler/xla/service/gpu/infeed_manager.h" #include "tensorflow/compiler/xla/shape_util.h" #include "tensorflow/compiler/xla/status_macros.h" #include "tensorflow/compiler/xla/statusor.h" @@ -44,16 +43,60 @@ GpuTransferManager::GpuTransferManager() Status GpuTransferManager::TransferLiteralToInfeed(se::StreamExecutor* executor, const Literal& literal) { const Shape& shape = literal.shape(); - VLOG(2) << "Transferring literal shape to infeed: " + VLOG(2) << "Transferring literal to infeed with shape: " << ShapeUtil::HumanString(shape); - // TODO(b/30467474) handle tuples. + std::vector<gpu::InfeedBuffer*> buffers; + if (ShapeUtil::IsTuple(shape)) { - return Unimplemented("Infeed with a tuple shape is not supported: %s", - ShapeUtil::HumanString(literal.shape()).c_str()); + if (ShapeUtil::IsNestedTuple(shape)) { + return Unimplemented( + "Infeed with a nested tuple shape is not supported: %s", + ShapeUtil::HumanString(literal.shape()).c_str()); + } + + for (const auto& tuple_element : literal.tuple_literals()) { + TF_ASSIGN_OR_RETURN( + gpu::InfeedBuffer * buffer, + TransferLiteralToInfeedInternal(executor, tuple_element)); + buffers.push_back(buffer); + } + } else { + TF_ASSIGN_OR_RETURN(gpu::InfeedBuffer * buffer, + TransferLiteralToInfeedInternal(executor, literal)); + buffers.push_back(buffer); + } + + gpu::InfeedManager* infeed_manager = gpu::GetOrCreateInfeedManager(); + se::Stream* stream = infeed_manager->GetStream(executor); + + // TODO(b/30467474): Since this stream is shared across different + // infeed requests, blocking on the stream might be + // heavy-handed. Figure out if finer-grained acknowledgement is + // possible. + if (!stream->BlockHostUntilDone()) { + for (gpu::InfeedBuffer* b : buffers) { + b->Done(); + } + return InternalError("Failed to complete data transfer on stream %p", + stream); } + infeed_manager->EnqueueBuffers(buffers); + + VLOG(2) << "Infeed data transferred"; + + return Status::OK(); +} + +StatusOr<gpu::InfeedBuffer*> +GpuTransferManager::TransferLiteralToInfeedInternal( + se::StreamExecutor* executor, const Literal& literal) { + const Shape& shape = literal.shape(); + CHECK(!ShapeUtil::IsTuple(shape)); + int64 size = GetByteSizeRequirement(shape); + if (size > std::numeric_limits<int32>::max()) { return Unimplemented("Infeed shape is too large: %s needs %lld bytes", ShapeUtil::HumanString(literal.shape()).c_str(), size); @@ -76,16 +119,7 @@ Status GpuTransferManager::TransferLiteralToInfeed(se::StreamExecutor* executor, VLOG(2) << "Queued infeed data on stream " << stream; - if (!stream->BlockHostUntilDone()) { - buffer->Done(); - return InternalError("Failed to complete data transfer on stream %p", - stream); - } - - infeed_manager->EnqueueBuffer(buffer); - - VLOG(2) << "Infeed data transferred"; - return Status::OK(); + return buffer; } } // namespace xla diff --git a/tensorflow/compiler/xla/service/gpu_transfer_manager.h b/tensorflow/compiler/xla/service/gpu_transfer_manager.h index 6dfe7ba029..6b736b9b9a 100644 --- a/tensorflow/compiler/xla/service/gpu_transfer_manager.h +++ b/tensorflow/compiler/xla/service/gpu_transfer_manager.h @@ -19,6 +19,7 @@ limitations under the License. #include <vector> #include "tensorflow/compiler/xla/service/generic_transfer_manager.h" +#include "tensorflow/compiler/xla/service/gpu/infeed_manager.h" #include "tensorflow/compiler/xla/service/transfer_manager.h" #include "tensorflow/compiler/xla/statusor.h" #include "tensorflow/compiler/xla/xla_data.pb.h" @@ -39,6 +40,11 @@ class GpuTransferManager : public GenericTransferManager { const Literal& literal) override; private: + // Internal helper function for TransferLiteralToInfeed(). Input + // literal cannot be a tuple. + StatusOr<gpu::InfeedBuffer*> TransferLiteralToInfeedInternal( + perftools::gputools::StreamExecutor* executor, const Literal& literal); + TF_DISALLOW_COPY_AND_ASSIGN(GpuTransferManager); }; |