diff options
Diffstat (limited to 'tensorflow/core/common_runtime/gpu')
33 files changed, 5197 insertions, 0 deletions
diff --git a/tensorflow/core/common_runtime/gpu/dma_helper.h b/tensorflow/core/common_runtime/gpu/dma_helper.h new file mode 100644 index 0000000000..7b0750f405 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/dma_helper.h @@ -0,0 +1,18 @@ +#ifndef TENSORFLOW_COMMON_RUNTIME_GPU_DMA_HELPER_H_ +#define TENSORFLOW_COMMON_RUNTIME_GPU_DMA_HELPER_H_ + +#include "tensorflow/core/public/tensor.h" + +// For internal use only. Visibility should be limited to brain/framework. + +namespace tensorflow { +class DMAHelper { + public: + static bool CanUseDMA(const Tensor* t) { return t->CanUseDMA(); } + static const void* base(const Tensor* t) { return t->base<const void>(); } + static void* base(Tensor* t) { return t->base<void>(); } + static TensorBuffer* buffer(Tensor* t) { return t->buf_; } + static const TensorBuffer* buffer(const Tensor* t) { return t->buf_; } +}; +} // namespace tensorflow +#endif // TENSORFLOW_COMMON_RUNTIME_GPU_DMA_HELPER_H_ diff --git a/tensorflow/core/common_runtime/gpu/gpu_allocator_retry.cc b/tensorflow/core/common_runtime/gpu/gpu_allocator_retry.cc new file mode 100644 index 0000000000..742459c63b --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_allocator_retry.cc @@ -0,0 +1,49 @@ +#include "tensorflow/core/common_runtime/gpu/gpu_allocator_retry.h" +#include "tensorflow/core/public/env.h" +#include "tensorflow/core/platform/logging.h" +#include "tensorflow/core/platform/port.h" + +namespace tensorflow { + +GPUAllocatorRetry::GPUAllocatorRetry() : env_(Env::Default()) {} + +void* GPUAllocatorRetry::AllocateRaw( + std::function<void*(size_t alignment, size_t num_bytes, + bool verbose_failure)> alloc_func, + int max_millis_to_wait, size_t alignment, size_t num_bytes) { + if (num_bytes == 0) { + LOG(WARNING) << "Request to allocate 0 bytes"; + return nullptr; + } + uint64 deadline_micros = env_->NowMicros() + max_millis_to_wait * 1000; + void* ptr = nullptr; + while (ptr == nullptr) { + ptr = alloc_func(alignment, num_bytes, false); + if (ptr == nullptr) { + uint64 now = env_->NowMicros(); + if (now < deadline_micros) { + mutex_lock l(mu_); + WaitForMilliseconds(&l, &memory_returned_, + (deadline_micros - now) / 1000); + } else { + return alloc_func(alignment, num_bytes, true); + } + } + } + return ptr; +} + +void GPUAllocatorRetry::DeallocateRaw(std::function<void(void*)> dealloc_func, + void* ptr) { + if (ptr == nullptr) { + LOG(ERROR) << "Request to free nullptr"; + return; + } + dealloc_func(ptr); + { + mutex_lock l(mu_); + memory_returned_.notify_all(); + } +} + +} // namespace tensorflow diff --git a/tensorflow/core/common_runtime/gpu/gpu_allocator_retry.h b/tensorflow/core/common_runtime/gpu/gpu_allocator_retry.h new file mode 100644 index 0000000000..a3298ab222 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_allocator_retry.h @@ -0,0 +1,36 @@ +#ifndef TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_ALLOCATOR_RETRY_H_ +#define TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_ALLOCATOR_RETRY_H_ + +#include "tensorflow/core/platform/port.h" +#include "tensorflow/core/public/env.h" + +namespace tensorflow { + +// A retrying wrapper for a memory allocator. +class GPUAllocatorRetry { + public: + GPUAllocatorRetry(); + + // Call 'alloc_func' to obtain memory. On first call, + // 'verbose_failure' will be false. If return value is nullptr, + // then wait up to 'max_millis_to_wait' milliseconds, retrying each + // time a call to DeallocateRaw() is detected, until either a good + // pointer is returned or the deadline is exhausted. If the + // deadline is exahusted, try one more time with 'verbose_failure' + // set to true. The value returned is either the first good pointer + // obtained from 'alloc_func' or nullptr. + void* AllocateRaw(std::function<void*(size_t alignment, size_t num_bytes, + bool verbose_failure)> alloc_func, + int max_millis_to_wait, size_t alignment, size_t bytes); + + // Calls dealloc_func(ptr) and then notifies any threads blocked in + // AllocateRaw() that would like to retry. + void DeallocateRaw(std::function<void(void* ptr)> dealloc_func, void* ptr); + + private: + Env* env_; + mutex mu_; + condition_variable memory_returned_; +}; +} // namespace tensorflow +#endif // TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_ALLOCATOR_RETRY_H_ diff --git a/tensorflow/core/common_runtime/gpu/gpu_allocator_retry_test.cc b/tensorflow/core/common_runtime/gpu/gpu_allocator_retry_test.cc new file mode 100644 index 0000000000..db1c58cc65 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_allocator_retry_test.cc @@ -0,0 +1,175 @@ +#include "tensorflow/core/common_runtime/gpu/gpu_allocator_retry.h" + +#include "tensorflow/core/lib/core/notification.h" +#include "tensorflow/core/platform/port.h" +#include "tensorflow/core/platform/logging.h" +#include "tensorflow/core/platform/thread_annotations.h" +#include "tensorflow/core/public/env.h" +#include <gtest/gtest.h> + +namespace tensorflow { +namespace { + +class FakeAllocator { + public: + FakeAllocator(size_t cap, int millis_to_wait) + : memory_capacity_(cap), millis_to_wait_(millis_to_wait) {} + + // Allocate just keeps track of the number of outstanding allocations, + // not their sizes. Assume a constant size for each. + void* AllocateRaw(size_t alignment, size_t num_bytes) { + return retry_.AllocateRaw( + [this](size_t a, size_t nb, bool v) { + mutex_lock l(mu_); + if (memory_capacity_ > 0) { + --memory_capacity_; + return good_ptr_; + } else { + return static_cast<void*>(nullptr); + } + }, + millis_to_wait_, alignment, num_bytes); + } + + void DeallocateRaw(void* ptr) { + retry_.DeallocateRaw( + [this](void* p) { + mutex_lock l(mu_); + ++memory_capacity_; + }, + ptr); + } + + private: + GPUAllocatorRetry retry_; + void* good_ptr_ = reinterpret_cast<void*>(0xdeadbeef); + mutex mu_; + size_t memory_capacity_ GUARDED_BY(mu_); + int millis_to_wait_; +}; + +class GPUAllocatorRetryTest : public ::testing::Test { + protected: + GPUAllocatorRetryTest() {} + + void LaunchConsumerThreads(int num_consumers, int cap_needed) { + consumer_count_.resize(num_consumers, 0); + for (int i = 0; i < num_consumers; ++i) { + consumers_.push_back(Env::Default()->StartThread( + ThreadOptions(), "anon_thread", [this, i, cap_needed]() { + do { + void* ptr = nullptr; + for (int j = 0; j < cap_needed; ++j) { + ptr = alloc_->AllocateRaw(16, 1); + if (ptr == nullptr) { + mutex_lock l(mu_); + has_failed_ = true; + return; + } + } + ++consumer_count_[i]; + for (int j = 0; j < cap_needed; ++j) { + alloc_->DeallocateRaw(ptr); + } + } while (!notifier_.HasBeenNotified()); + })); + } + } + + // Wait up to wait_micros microseconds for has_failed_ to equal expected, + // then terminate all threads. + void JoinConsumerThreads(bool expected, int wait_micros) { + while (wait_micros > 0) { + { + mutex_lock l(mu_); + if (has_failed_ == expected) break; + } + int interval_micros = std::min(1000, wait_micros); + Env::Default()->SleepForMicroseconds(interval_micros); + wait_micros -= interval_micros; + } + notifier_.Notify(); + for (auto c : consumers_) { + // Blocks until thread terminates. + delete c; + } + } + + std::unique_ptr<FakeAllocator> alloc_; + std::vector<Thread*> consumers_; + std::vector<int> consumer_count_; + Notification notifier_; + mutex mu_; + bool has_failed_ GUARDED_BY(mu_) = false; + int count_ GUARDED_BY(mu_) = 0; +}; + +// Verifies correct retrying when memory is slightly overcommitted but +// we allow retry. +TEST_F(GPUAllocatorRetryTest, RetrySuccess) { + // Support up to 2 allocations simultaneously, waits up to 10 msec for + // a chance to alloc. + alloc_.reset(new FakeAllocator(2, 10000)); + // Launch 3 consumers, each of whom needs 1 unit at a time. + LaunchConsumerThreads(3, 1); + // This should be enough time for each consumer to be satisfied many times. + Env::Default()->SleepForMicroseconds(50000); + JoinConsumerThreads(false, 0); + for (int i = 0; i < 3; ++i) { + LOG(INFO) << "Consumer " << i << " is " << consumer_count_[i]; + } + { + mutex_lock l(mu_); + EXPECT_FALSE(has_failed_); + } + EXPECT_GT(consumer_count_[0], 0); + EXPECT_GT(consumer_count_[1], 0); + EXPECT_GT(consumer_count_[2], 0); +} + +// Verifies OutOfMemory failure when memory is slightly overcommitted +// and retry is not allowed. +TEST_F(GPUAllocatorRetryTest, NoRetryFail) { + // Support up to 2 allocations simultaneously, waits up to 0 msec for + // a chance to alloc. + alloc_.reset(new FakeAllocator(2, 0)); + // Launch 3 consumers, each of whom needs 1 unit at a time. + LaunchConsumerThreads(3, 1); + Env::Default()->SleepForMicroseconds(50000); + // Will wait up to 10 seconds for proper race condition to occur, resulting + // in failure. + JoinConsumerThreads(true, 10000000); + for (int i = 0; i < 3; ++i) { + LOG(INFO) << "Consumer " << i << " is " << consumer_count_[i]; + } + { + mutex_lock l(mu_); + EXPECT_TRUE(has_failed_); + } +} + +// Verifies OutOfMemory failure when retry is allowed but memory capacity +// is too low even for retry. +TEST_F(GPUAllocatorRetryTest, RetryInsufficientFail) { + // Support up to 2 allocations simultaneously, waits up to 10 msec for + // a chance to alloc. + alloc_.reset(new FakeAllocator(2, 10000)); + // Launch 3 consumers, each of whom needs 2 units at a time. We expect + // deadlock where 2 consumers each hold 1 unit, and timeout trying to + // get the second. + LaunchConsumerThreads(3, 2); + Env::Default()->SleepForMicroseconds(50000); + // Will wait up to 10 seconds for proper race condition to occur, resulting + // in failure. + JoinConsumerThreads(true, 10000000); + for (int i = 0; i < 3; ++i) { + LOG(INFO) << "Consumer " << i << " is " << consumer_count_[i]; + } + { + mutex_lock l(mu_); + EXPECT_TRUE(has_failed_); + } +} + +} // namespace +} // namespace tensorflow diff --git a/tensorflow/core/common_runtime/gpu/gpu_bfc_allocator.cc b/tensorflow/core/common_runtime/gpu/gpu_bfc_allocator.cc new file mode 100644 index 0000000000..3df833594f --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_bfc_allocator.cc @@ -0,0 +1,397 @@ +#include "tensorflow/core/common_runtime/gpu/gpu_bfc_allocator.h" + +#include "tensorflow/stream_executor/multi_platform_manager.h" +#include "tensorflow/stream_executor/stream_executor.h" +#include "tensorflow/core/common_runtime/gpu/gpu_allocator_retry.h" +#include "tensorflow/core/common_runtime/gpu/gpu_init.h" +#include "tensorflow/core/lib/core/bits.h" +#include "tensorflow/core/lib/gtl/stl_util.h" +#include "tensorflow/core/lib/strings/numbers.h" +#include "tensorflow/core/lib/strings/str_util.h" +#include "tensorflow/core/lib/strings/strcat.h" +#include "tensorflow/core/platform/logging.h" +#include "tensorflow/core/platform/port.h" + +namespace gpu = ::perftools::gputools; + +namespace tensorflow { + +GPUBFCAllocator::GPUBFCAllocator(int device_id, size_t total_memory) + : device_id_(device_id) { + // Get a pointer to the stream_executor for this device + stream_exec_ = GPUMachineManager()->ExecutorForDevice(device_id).ValueOrDie(); + + // Allocate the requested amount of memory. + gpu_memory_size_ = total_memory; + + LOG(INFO) << "Allocating " << strings::HumanReadableNumBytes(gpu_memory_size_) + << " bytes."; + gpu::DeviceMemory<char> gpu_mem = + stream_exec_->AllocateArray<char>(gpu_memory_size_); + + QCHECK(gpu_mem != nullptr) + << " Could not allocate GPU device memory for device " << device_id + << ". Tried to allocate " + << strings::HumanReadableNumBytes(gpu_memory_size_); + base_ptr_ = gpu_mem.opaque(); + LOG(INFO) << "GPU " << device_id << " memory begins at " << base_ptr_ + << " extends to " + << static_cast<void*>( + (static_cast<char*>(base_ptr_) + gpu_memory_size_)); + + // Create a bunch of bins of various good sizes. + + // Covers allocations of exactly 256 bytes (the minimum size). + bins_.insert(std::make_pair(256, new Bin(256))); + + // We create bins to fit all possible ranges that cover the + // gpu_memory_size_ starting from allocations up to 1024 bytes to + // allocations up to (and including) the memory limit. + for (size_t bin_size = 1024; bin_size < gpu_memory_size_ * 2; bin_size *= 2) { + LOG(INFO) << "Creating bin of max chunk size " + << strings::HumanReadableNumBytes(bin_size); + bins_.insert(std::make_pair(bin_size, new Bin(bin_size))); + } + + // Create one large chunk for the whole memory space that will + // be chunked later. + GPUBFCAllocator::Chunk* c = new GPUBFCAllocator::Chunk(); + c->ptr = gpu_mem.opaque(); + c->size = gpu_memory_size_; + c->in_use = false; + c->prev = nullptr; + c->next = nullptr; + + ptr_to_chunk_map_.insert(std::make_pair(c->ptr, c)); + + // Insert the chunk into the right bin. + ReassignChunkToBin(c); +} + +GPUBFCAllocator::~GPUBFCAllocator() { + // Return memory back. + if (base_ptr_) { + gpu::DeviceMemoryBase gpu_ptr{base_ptr_}; + stream_exec_->Deallocate(&gpu_ptr); + } + + gtl::STLDeleteValues(&bins_); +} + +void* GPUBFCAllocator::AllocateRaw(size_t unused_alignment, size_t num_bytes) { + static const int64 kMaxMillisToWait = 10000; // 10 seconds + return retry_helper_.AllocateRaw( + [this](size_t a, size_t nb, bool v) { + return AllocateRawInternal(a, nb, v); + }, + kMaxMillisToWait, unused_alignment, num_bytes); +} + +void* GPUBFCAllocator::AllocateRawInternal(size_t unused_alignment, + size_t num_bytes, + bool dump_log_on_failure) { + if (num_bytes == 0) { + LOG(ERROR) << "tried to allocate 0 bytes"; + return nullptr; + } + // First, always allocate memory of at least 256 bytes, and always + // allocate multiples of 256 bytes so all memory addresses are + // nicely byte aligned. + size_t rounded_bytes = (256 * ((num_bytes + 255) / 256)); + DCHECK_EQ(0, rounded_bytes % 256); + + // The BFC allocator tries to find the best fit first. + // + // First identify the first bin that could satisfy rounded_bytes. + auto it = bins_.lower_bound(rounded_bytes); + if (it == bins_.end()) { + LOG(ERROR) << " Asked for " << rounded_bytes << " but largest bin was " + << bins_.rbegin()->first; + return nullptr; + } + + mutex_lock l(lock_); + for (; it != bins_.end(); ++it) { + // Start searching from the first bin for the smallest chunk that fits + // rounded_bytes. + Bin* b = it->second; + for (GPUBFCAllocator::Chunk* chunk : b->chunks) { + if (!chunk->in_use && chunk->size > rounded_bytes) { + // We found an existing chunk that fits us that wasn't in use. + chunk->in_use = true; + + // If we can break the size of the chunk into two reasonably + // large pieces, do so. + // + // TODO(vrv): What should be the criteria when deciding when + // to split? + if (chunk->size >= rounded_bytes * 2) { + SplitChunk(chunk, rounded_bytes); + } + + // The requested size of the returned chunk is what the user + // has allocated. + chunk->requested_size = num_bytes; + + VLOG(4) << "Returning: " << chunk->ptr; + return chunk->ptr; + } + } + } + + // We searched all bins for an existing free chunk to use and + // couldn't find one. This means we must have run out of memory, + // Dump the memory log for analysis. + if (dump_log_on_failure) { + DumpMemoryLog(rounded_bytes); + LOG(WARNING) << "Ran out of memory trying to allocate " + << strings::HumanReadableNumBytes(num_bytes) + << ". See logs for memory state"; + } + return nullptr; +} + +void GPUBFCAllocator::SplitChunk(GPUBFCAllocator::Chunk* c, size_t num_bytes) { + // Create a new chunk starting num_bytes after c + GPUBFCAllocator::Chunk* new_chunk = new GPUBFCAllocator::Chunk(); + new_chunk->ptr = static_cast<void*>(static_cast<char*>(c->ptr) + num_bytes); + VLOG(6) << "Adding to chunk map: " << new_chunk->ptr; + ptr_to_chunk_map_.insert(std::make_pair(new_chunk->ptr, new_chunk)); + + // Set the new sizes of the chunks. + new_chunk->size = c->size - num_bytes; + c->size = num_bytes; + + // The new chunk is not in use. + new_chunk->in_use = false; + + // Maintain the pointers. + // c <-> c_neighbor becomes + // c <-> new_chunk <-> c_neighbor + GPUBFCAllocator::Chunk* c_neighbor = c->next; + new_chunk->prev = c; + new_chunk->next = c_neighbor; + c->next = new_chunk; + if (c_neighbor) { + c_neighbor->prev = new_chunk; + } + + // Maintain the bins + ReassignChunkToBin(new_chunk); + ReassignChunkToBin(c); +} + +void GPUBFCAllocator::DeallocateRaw(void* ptr) { + retry_helper_.DeallocateRaw([this](void* p) { DeallocateRawInternal(p); }, + ptr); +} + +void GPUBFCAllocator::DeallocateRawInternal(void* ptr) { + if (ptr == nullptr) { + LOG(ERROR) << "tried to deallocate nullptr"; + return; + } + mutex_lock l(lock_); + + // Find the chunk from the ptr. + auto it = ptr_to_chunk_map_.find(ptr); + CHECK(it != ptr_to_chunk_map_.end()) + << "Asked to deallocate a pointer we never allocated: " << ptr; + + GPUBFCAllocator::Chunk* c = it->second; + VLOG(6) << "Chunk at " << c->ptr << " no longer in use"; + // Mark the chunk as no longer in use + c->in_use = false; + + // Consider coalescing it. + MaybeCoalesce(c); +} + +// Merges c1 and c2 when c1->next is c2 and c2->prev is c1. +// We merge c2 into c1. +void GPUBFCAllocator::Merge(GPUBFCAllocator::Chunk* c1, + GPUBFCAllocator::Chunk* c2) { + // We can only merge chunks that are not in use. + DCHECK(!c1->in_use && !c2->in_use); + + // c1's prev doesn't change, still points to the same ptr, and is + // still not in use. + + // Fix up neighbor pointers + // + // c1 <-> c2 <-> c3 should become + // c1 <-> c3 + GPUBFCAllocator::Chunk* c3 = c2->next; + c1->next = c3; + CHECK(c2->prev == c1); + if (c3 != nullptr) { + c3->prev = c1; + } + + // Set the new size + c1->size += c2->size; + + // Delete c2 and cleanup all state + RemoveChunkFromBin(c2); +} + +void GPUBFCAllocator::ReassignChunkToBin(GPUBFCAllocator::Chunk* c) { + auto it = bins_.lower_bound(c->size); + CHECK(it != bins_.end()) << " Tried to reassign to non-existent bin for size " + << c->size; + + Bin* new_bin = it->second; + + // If the bin has not changed, do nothing. + Bin* old_bin = c->bin; + if (old_bin != nullptr && new_bin == old_bin) { + return; + } + + // The bin has changed. Add the chunk to the new bin and remove + // the chunk from the old bin. + new_bin->chunks.insert(c); + c->bin = new_bin; + + if (old_bin == nullptr) { + return; + } + + // Remove chunk from old bin + for (auto it = old_bin->chunks.begin(); it != old_bin->chunks.end(); ++it) { + if (*it == c) { + old_bin->chunks.erase(it); + return; + } + } + CHECK(false) << "Could not find chunk in old bin"; +} + +void GPUBFCAllocator::RemoveChunkFromBin(GPUBFCAllocator::Chunk* c) { + Bin* b = c->bin; + for (auto it = b->chunks.begin(); it != b->chunks.end(); ++it) { + Chunk* other_c = *it; + if (other_c->ptr == c->ptr) { + b->chunks.erase(it); + VLOG(4) << "Removing: " << c->ptr; + ptr_to_chunk_map_.erase(c->ptr); + delete c; + return; + } + } + + CHECK(false) << "Could not find chunk in bin"; +} + +void GPUBFCAllocator::MaybeCoalesce(GPUBFCAllocator::Chunk* c) { + // This chunk is no longer in-use, consider coalescing the chunk + // with adjacent chunks. + Chunk* chunk_to_reassign = nullptr; + + // If the next chunk is free, coalesce the two, if the result would + // fit in an existing bin. + if (c->next && !c->next->in_use) { + VLOG(8) << "Chunk at " << c->next->ptr << " merging with c " << c->ptr; + + chunk_to_reassign = c; + + // Deletes c->next + Merge(c, c->next); + } + + // If the previous chunk is free, coalesce the two + if (c->prev && !c->prev->in_use) { + VLOG(8) << "Chunk at " << c->ptr << " merging into c->prev " + << c->prev->ptr; + + chunk_to_reassign = c->prev; + + // Deletes c + Merge(c->prev, c); + } + + // Reassign the final merged chunk into the right bin. + if (chunk_to_reassign) { + ReassignChunkToBin(chunk_to_reassign); + } +} + +void GPUBFCAllocator::AddAllocVisitor(Visitor visitor) { + VLOG(1) << "AddVisitor"; + mutex_lock l(lock_); + region_visitors_.push_back(visitor); + visitor(base_ptr_, gpu_memory_size_); +} + +bool GPUBFCAllocator::TracksAllocationSizes() { return true; } + +size_t GPUBFCAllocator::RequestedSize(void* ptr) { + mutex_lock l(lock_); + auto it = ptr_to_chunk_map_.find(ptr); + CHECK(it != ptr_to_chunk_map_.end()) + << "Asked for requested size of pointer we never allocated: " << ptr; + GPUBFCAllocator::Chunk* c = it->second; + return c->requested_size; +} + +size_t GPUBFCAllocator::AllocatedSize(void* ptr) { + mutex_lock l(lock_); + auto it = ptr_to_chunk_map_.find(ptr); + CHECK(it != ptr_to_chunk_map_.end()) + << "Asked for allocated size of pointer we never allocated: " << ptr; + GPUBFCAllocator::Chunk* c = it->second; + return c->size; +} + +void GPUBFCAllocator::DumpMemoryLog(size_t num_bytes) { + // For each bin: tally up the total number of chunks and bytes. + for (auto bit : bins_) { + Bin* b = bit.second; + + size_t total_bytes_in_use = 0; + size_t total_bytes_in_bin = 0; + size_t total_requested_bytes_in_use = 0; + size_t total_requested_bytes_in_bin = 0; + size_t total_chunks_in_use = 0; + size_t total_chunks_in_bin = 0; + for (Chunk* c : b->chunks) { + total_bytes_in_bin += c->size; + total_requested_bytes_in_bin += c->requested_size; + ++total_chunks_in_bin; + if (c->in_use) { + total_bytes_in_use += c->size; + total_requested_bytes_in_use += c->requested_size; + ++total_chunks_in_use; + } + } + + LOG(INFO) << "Bin (" << b->bin_size + << "): \tTotal Chunks: " << total_chunks_in_bin + << ", Chunks in use: " << total_chunks_in_use << " " + << strings::HumanReadableNumBytes(total_bytes_in_bin) + << " allocated for chunks. " + << strings::HumanReadableNumBytes(total_requested_bytes_in_bin) + << " client-requested for chunks. " + << strings::HumanReadableNumBytes(total_bytes_in_use) + << " in use in bin. " + << strings::HumanReadableNumBytes(total_requested_bytes_in_use) + << " client-requested in use in bin."; + } + + // Find the bin that we would have liked to allocate in, so we + // can get some further analysis about fragmentation. + auto it = bins_.lower_bound(num_bytes); + if (it != bins_.end()) { + Bin* b = it->second; + + LOG(INFO) << "Bin for " << strings::HumanReadableNumBytes(num_bytes) + << " was " << strings::HumanReadableNumBytes(b->bin_size) + << ", Chunk State: "; + + for (Chunk* c : b->chunks) { + LOG(INFO) << c->DebugString(true); + } + } +} + +} // namespace tensorflow diff --git a/tensorflow/core/common_runtime/gpu/gpu_bfc_allocator.h b/tensorflow/core/common_runtime/gpu/gpu_bfc_allocator.h new file mode 100644 index 0000000000..3d1601e132 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_bfc_allocator.h @@ -0,0 +1,156 @@ +#ifndef TENSORFLOW_COMMON_RUNTIME_GPU_GPU_BFC_ALLOCATOR_H_ +#define TENSORFLOW_COMMON_RUNTIME_GPU_GPU_BFC_ALLOCATOR_H_ + +#include <memory> +#include <string> +#include <unordered_map> +#include <vector> + +#include "tensorflow/stream_executor/stream_executor.h" +#include "tensorflow/core/common_runtime/gpu/gpu_allocator_retry.h" +#include "tensorflow/core/common_runtime/gpu/visitable_allocator.h" +#include "tensorflow/core/lib/gtl/stl_util.h" +#include "tensorflow/core/lib/strings/strcat.h" +#include "tensorflow/core/platform/port.h" +#include "tensorflow/core/platform/thread_annotations.h" + +namespace tensorflow { + +// A GPU memory allocator that implements a 'best-fit with coalescing' +// algorithm. This is essentially a very simple version of Doug Lea's +// malloc (dlmalloc). +// +// The goal of this allocator is to support defragmentation via +// coalescing. One assumption we make is that the process using this +// allocator owns pretty much all of the GPU memory, and that nearly +// all requests to allocate GPU memory go through this interface. +class GPUBFCAllocator : public VisitableAllocator { + public: + // 'device_id' refers to the StreamExecutor ID of the device within + // the process and must reference a valid ID in the process. + explicit GPUBFCAllocator(int device_id, size_t total_memory); + ~GPUBFCAllocator() override; + + string Name() override { return "gpu_bfc"; } + void* AllocateRaw(size_t alignment, size_t num_bytes) override; + void DeallocateRaw(void* ptr) override; + + void AddAllocVisitor(Visitor visitor) override; + + // Does nothing, because gpu memory is never freed. + void AddFreeVisitor(Visitor visitor) override {} + + bool TracksAllocationSizes() override; + + size_t RequestedSize(void* ptr) override; + + size_t AllocatedSize(void* ptr) override; + + private: + struct Bin; + + void* AllocateRawInternal(size_t alignment, size_t num_bytes, + bool dump_log_on_failure); + void DeallocateRawInternal(void* ptr); + + // Chunks point to GPU memory. Their prev/next pointers form a + // doubly-linked list of addresses sorted by GPU base address that + // must be contiguous. Chunks contain information about whether + // they are in use or whether they are free, and contain a pointer + // to the bin they are in. + struct Chunk { + size_t size = 0; // Full size of GPU buffer. + + // We sometimes give chunks that are larger than needed to reduce + // fragmentation. requested_size keeps track of what the client + // actually wanted so we can understand whether our splitting + // strategy is efficient. + size_t requested_size = 0; + + bool in_use = false; + void* ptr = nullptr; // pointer to granted GPU subbuffer. + + // If not null, the memory referred to by 'prev' is directly + // preceding the memory used by this chunk. E.g., It should start + // at 'ptr - prev->size' + Chunk* prev = nullptr; + + // If not null, the memory referred to by 'next' is directly + // following the memory used by this chunk. E.g., It should be at + // 'ptr + size' + Chunk* next = nullptr; + + // What bin are we in? + Bin* bin = nullptr; + + string DebugString(bool recurse) { + string dbg; + strings::StrAppend(&dbg, " Size: ", strings::HumanReadableNumBytes(size), + " | Requested Size: ", + strings::HumanReadableNumBytes(requested_size), + " | in_use: ", in_use); + if (recurse && prev) { + strings::StrAppend(&dbg, ", prev: ", prev->DebugString(false)); + } + if (recurse && next) { + strings::StrAppend(&dbg, ", next: ", next->DebugString(false)); + } + return dbg; + } + }; + + Chunk* AllocateNewChunk(size_t num_bytes); + void SplitChunk(Chunk* c, size_t num_bytes); + void Merge(Chunk* c1, Chunk* c2); + void MaybeCoalesce(Chunk* c); + + void ReassignChunkToBin(Chunk* c); + void RemoveChunkFromBin(Chunk* c); + + void DumpMemoryLog(size_t num_bytes); + + // A Bin is a collection of similar-sized Chunks. + struct Bin { + // All chunks in this bin have >= bin_size memory. + size_t bin_size = 0; + + struct ChunkComparator { + bool operator()(Chunk* a, Chunk* b) { return a->size < b->size; } + }; + + // List of chunks within the bin, sorted by chunk size. + std::multiset<Chunk*, ChunkComparator> chunks; + + explicit Bin(size_t bs) : bin_size(bs) {} + + ~Bin() { gtl::STLDeleteElements(&chunks); } + }; + + GPUAllocatorRetry retry_helper_; + + // Structures immutable after construction + const int device_id_; + // The base pointer where all the GPU memory begins. + void* base_ptr_ = nullptr; + size_t gpu_memory_size_ = 0; + + // Map from bin size to Bin + // After construction, the bin map is never resized. + std::map<size_t, Bin*> bins_; + + perftools::gputools::StreamExecutor* stream_exec_; // Not owned. + + // Structures mutable after construction + mutable mutex lock_; + // Not owned. + std::unordered_map<void*, Chunk*> ptr_to_chunk_map_; + + // Called once on each region, ASAP. + std::vector<Visitor> region_visitors_; + + TF_DISALLOW_COPY_AND_ASSIGN(GPUBFCAllocator); +}; + +} // namespace tensorflow + +#endif // TENSORFLOW_COMMON_RUNTIME_GPU_GPU_BFC_ALLOCATOR_H_ diff --git a/tensorflow/core/common_runtime/gpu/gpu_bfc_allocator_test.cc b/tensorflow/core/common_runtime/gpu/gpu_bfc_allocator_test.cc new file mode 100644 index 0000000000..7b5e8aec1d --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_bfc_allocator_test.cc @@ -0,0 +1,166 @@ +#if GOOGLE_CUDA + +#include "tensorflow/core/common_runtime/gpu/gpu_bfc_allocator.h" + +#include <algorithm> +#include <vector> + +#include "tensorflow/stream_executor/stream_executor.h" +#include <gtest/gtest.h> +#include "tensorflow/core/common_runtime/gpu/gpu_init.h" +#include "tensorflow/core/lib/gtl/inlined_vector.h" +#include "tensorflow/core/lib/random/simple_philox.h" +#include "tensorflow/core/platform/logging.h" +#include "tensorflow/core/platform/port.h" + +namespace gpu = ::perftools::gputools; + +namespace tensorflow { +namespace { + +TEST(GPUBFCAllocatorTest, NoDups) { + GPUBFCAllocator a(0, 1 << 30); + // Allocate a lot of raw pointers + std::vector<void*> ptrs; + for (int s = 1; s < 1024; s++) { + void* raw = a.AllocateRaw(1, s); + ptrs.push_back(raw); + } + + std::sort(ptrs.begin(), ptrs.end()); + + // Make sure none of them are equal, and that none of them overlap. + for (int i = 0; i < ptrs.size(); i++) { + if (i > 0) { + ASSERT_NE(ptrs[i], ptrs[i - 1]); // No dups + size_t req_size = a.RequestedSize(ptrs[i - 1]); + ASSERT_GT(req_size, 0); + ASSERT_GE(static_cast<char*>(ptrs[i]) - static_cast<char*>(ptrs[i - 1]), + req_size); + } + } + + for (int i = 0; i < ptrs.size(); i++) { + a.DeallocateRaw(ptrs[i]); + } +} + +TEST(GPUBFCAllocatorTest, AllocationsAndDeallocations) { + GPUBFCAllocator a(0, 1 << 30); + // Allocate 256 raw pointers of sizes between 100 bytes and about + // a meg + random::PhiloxRandom philox(123, 17); + random::SimplePhilox rand(&philox); + + std::vector<void*> initial_ptrs; + for (int s = 1; s < 256; s++) { + size_t size = std::min<size_t>( + std::max<size_t>(rand.Rand32() % 1048576, 100), 1048576); + void* raw = a.AllocateRaw(1, size); + + initial_ptrs.push_back(raw); + } + + // Deallocate half of the memory, and keep track of the others. + std::vector<void*> existing_ptrs; + for (int i = 0; i < initial_ptrs.size(); i++) { + if (i % 2 == 1) { + a.DeallocateRaw(initial_ptrs[i]); + } else { + existing_ptrs.push_back(initial_ptrs[i]); + } + } + + // Allocate a lot of raw pointers + for (int s = 1; s < 256; s++) { + size_t size = std::min<size_t>( + std::max<size_t>(rand.Rand32() % 1048576, 100), 1048576); + void* raw = a.AllocateRaw(1, size); + existing_ptrs.push_back(raw); + } + + std::sort(existing_ptrs.begin(), existing_ptrs.end()); + // Make sure none of them are equal + for (int i = 0; i < existing_ptrs.size(); i++) { + if (i > 0) { + CHECK_NE(existing_ptrs[i], existing_ptrs[i - 1]); // No dups + + size_t req_size = a.RequestedSize(existing_ptrs[i - 1]); + ASSERT_GT(req_size, 0); + + // Check that they don't overlap. + ASSERT_GE(static_cast<char*>(existing_ptrs[i]) - + static_cast<char*>(existing_ptrs[i - 1]), + req_size); + } + } + + for (int i = 0; i < existing_ptrs.size(); i++) { + a.DeallocateRaw(existing_ptrs[i]); + } +} + +TEST(GPUBFCAllocatorTest, ExerciseCoalescing) { + GPUBFCAllocator a(0, 1 << 30); + + float* first_ptr = a.Allocate<float>(1024); + a.Deallocate(first_ptr); + for (int i = 0; i < 1024; ++i) { + // Allocate several buffers of different sizes, and then clean them + // all up. We should be able to repeat this endlessly without + // causing fragmentation and growth. + float* t1 = a.Allocate<float>(1024); + + int64* t2 = a.Allocate<int64>(1048576); + double* t3 = a.Allocate<double>(2048); + float* t4 = a.Allocate<float>(10485760); + + a.Deallocate(t1); + a.Deallocate(t2); + a.Deallocate(t3); + a.Deallocate(t4); + } + + // At the end, we should have coalesced all memory into one region + // starting at the beginning, so validate that allocating a pointer + // starts from this region. + float* first_ptr_after = a.Allocate<float>(1024); + EXPECT_EQ(first_ptr, first_ptr_after); + a.Deallocate(first_ptr_after); +} + +TEST(GPUBFCAllocatorTest, AllocateZeroBufSize) { + GPUBFCAllocator a(0, 1 << 30); + float* ptr = a.Allocate<float>(0); + EXPECT_EQ(nullptr, ptr); +} + +TEST(GPUBFCAllocatorTest, TracksSizes) { + GPUBFCAllocator a(0, 1 << 30); + EXPECT_EQ(true, a.TracksAllocationSizes()); +} + +TEST(GPUBFCAllocatorTest, AllocatedVsRequested) { + GPUBFCAllocator a(0, 1 << 30); + float* t1 = a.Allocate<float>(1); + EXPECT_EQ(4, a.RequestedSize(t1)); + EXPECT_EQ(256, a.AllocatedSize(t1)); + a.Deallocate(t1); +} + +TEST(GPUBFCAllocatorTest, TestCustomMemoryLimit) { + // Configure a 1MiB byte limit + GPUBFCAllocator a(0, 1 << 20); + + float* first_ptr = a.Allocate<float>(1 << 6); + float* second_ptr = a.Allocate<float>(1 << 20); + + EXPECT_NE(nullptr, first_ptr); + EXPECT_EQ(nullptr, second_ptr); + a.Deallocate(first_ptr); +} + +} // namespace +} // namespace tensorflow + +#endif // GOOGLE_CUDA diff --git a/tensorflow/core/common_runtime/gpu/gpu_debug_allocator.cc b/tensorflow/core/common_runtime/gpu/gpu_debug_allocator.cc new file mode 100644 index 0000000000..5ec405cd80 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_debug_allocator.cc @@ -0,0 +1,186 @@ +#include "tensorflow/core/common_runtime/gpu/gpu_debug_allocator.h" + +#include "tensorflow/core/common_runtime/gpu/gpu_init.h" +#include "tensorflow/stream_executor/multi_platform_manager.h" +#include "tensorflow/stream_executor/stream_executor.h" + +namespace gpu = ::perftools::gputools; + +namespace tensorflow { + +#define MASK_WORDS 2 +#define MASK_BYTES (MASK_WORDS * sizeof(int64)) + +namespace { + +static int64* NewMask(int64 word) { + int64* m = new int64[MASK_WORDS]; + for (int i = 0; i < MASK_WORDS; ++i) { + m[i] = word; + } + return m; +} + +static int64* before_mask = NewMask(0xabababababababab); +static int64* after_mask = NewMask(0xcdcdcdcdcdcdcdcd); + +bool CheckMask(perftools::gputools::StreamExecutor* exec, void* ptr, + int64* mask) { + gpu::DeviceMemory<int64> gpu_ptr{gpu::DeviceMemoryBase{ptr, MASK_BYTES}}; + int64 tmp[MASK_WORDS]; + + if (!exec->SynchronousMemcpy(&tmp, gpu_ptr, MASK_BYTES)) { + LOG(FATAL) << "Could not copy debug mask"; + } + + bool ok = true; + for (int i = 0; i < MASK_WORDS; ++i) { + ok &= (mask[i] == tmp[i]); + if (!ok) { + LOG(ERROR) << "i=" << i + << " mask=" << reinterpret_cast<const void*>(mask[i]) + << " field=" << reinterpret_cast<const void*>(tmp[i]); + } + } + + return ok; +} + +void InitMask(perftools::gputools::StreamExecutor* exec, void* ptr, + int64* mask) { + gpu::DeviceMemory<int64> gpu_ptr{gpu::DeviceMemoryBase{ptr, MASK_BYTES}}; + if (!exec->SynchronousMemcpy(&gpu_ptr, mask, MASK_BYTES)) { + LOG(FATAL) << "Could not copy debug mask"; + } +} + +} // namespace + +// ----------------------------------------------------------------------------- +// GPUDebugAllocator +// ----------------------------------------------------------------------------- +GPUDebugAllocator::GPUDebugAllocator(VisitableAllocator* allocator, + int device_id) + : base_allocator_(allocator) { + stream_exec_ = GPUMachineManager()->ExecutorForDevice(device_id).ValueOrDie(); +} + +GPUDebugAllocator::~GPUDebugAllocator() { delete base_allocator_; } + +void* GPUDebugAllocator::AllocateRaw(size_t alignment, size_t num_bytes) { + num_bytes += (2 * MASK_BYTES); + + void* allocated_ptr = base_allocator_->AllocateRaw(alignment, num_bytes); + + // Return the pointer after the header + void* rv = static_cast<char*>(allocated_ptr) + MASK_BYTES; + + // Write the header at allocated_ptr + InitMask(stream_exec_, allocated_ptr, before_mask); + + // Write the footer at the end. + size_t req_size = base_allocator_->RequestedSize(allocated_ptr); + InitMask(stream_exec_, + static_cast<char*>(allocated_ptr) + req_size - MASK_BYTES, + after_mask); + return rv; +} +void GPUDebugAllocator::DeallocateRaw(void* ptr) { + CHECK(CheckHeader(ptr)) << "before_mask has been overwritten"; + CHECK(CheckFooter(ptr)) << "after_mask has been overwritten"; + + // Backtrack to the beginning of the header. + ptr = static_cast<void*>(static_cast<char*>(ptr) - MASK_BYTES); + // Deallocate the memory + base_allocator_->DeallocateRaw(ptr); +} + +void GPUDebugAllocator::AddAllocVisitor(Visitor visitor) { + return base_allocator_->AddAllocVisitor(visitor); +} + +void GPUDebugAllocator::AddFreeVisitor(Visitor visitor) { + return base_allocator_->AddFreeVisitor(visitor); +} + +bool GPUDebugAllocator::TracksAllocationSizes() { return true; } + +size_t GPUDebugAllocator::RequestedSize(void* ptr) { + auto req_size = + base_allocator_->RequestedSize(static_cast<char*>(ptr) - MASK_BYTES); + return req_size - 2 * MASK_BYTES; +} + +size_t GPUDebugAllocator::AllocatedSize(void* ptr) { + return base_allocator_->AllocatedSize(static_cast<char*>(ptr) - MASK_BYTES); +} + +bool GPUDebugAllocator::CheckHeader(void* ptr) { + return CheckMask(stream_exec_, static_cast<char*>(ptr) - MASK_BYTES, + before_mask); +} + +bool GPUDebugAllocator::CheckFooter(void* ptr) { + char* original_ptr = static_cast<char*>(ptr) - MASK_BYTES; + size_t req_size = base_allocator_->RequestedSize(original_ptr); + return CheckMask(stream_exec_, original_ptr + req_size - MASK_BYTES, + after_mask); +} + +// ----------------------------------------------------------------------------- +// GPUNanResetAllocator +// ----------------------------------------------------------------------------- +GPUNanResetAllocator::GPUNanResetAllocator(VisitableAllocator* allocator, + int device_id) + : base_allocator_(allocator) { + stream_exec_ = GPUMachineManager()->ExecutorForDevice(device_id).ValueOrDie(); +} + +GPUNanResetAllocator::~GPUNanResetAllocator() { delete base_allocator_; } + +void* GPUNanResetAllocator::AllocateRaw(size_t alignment, size_t num_bytes) { + void* allocated_ptr = base_allocator_->AllocateRaw(alignment, num_bytes); + + // Initialize the buffer to Nans + size_t req_size = base_allocator_->RequestedSize(allocated_ptr); + std::vector<float> nans(req_size / sizeof(float), std::nanf("")); + gpu::DeviceMemory<float> nan_ptr{ + gpu::DeviceMemoryBase{static_cast<float*>(allocated_ptr), req_size}}; + + if (!stream_exec_->SynchronousMemcpy(&nan_ptr, &nans[0], req_size)) { + LOG(ERROR) << "Could not initialize to NaNs"; + } + + return allocated_ptr; +} +void GPUNanResetAllocator::DeallocateRaw(void* ptr) { + // Reset the buffer to Nans + size_t req_size = base_allocator_->RequestedSize(ptr); + std::vector<float> nans(req_size / sizeof(float), std::nanf("")); + gpu::DeviceMemory<float> nan_ptr{ + gpu::DeviceMemoryBase{static_cast<float*>(ptr), req_size}}; + if (!stream_exec_->SynchronousMemcpy(&nan_ptr, &nans[0], req_size)) { + LOG(ERROR) << "Could not initialize to NaNs"; + } + + // Deallocate the memory + base_allocator_->DeallocateRaw(ptr); +} + +void GPUNanResetAllocator::AddAllocVisitor(Visitor visitor) { + return base_allocator_->AddAllocVisitor(visitor); +} + +void GPUNanResetAllocator::AddFreeVisitor(Visitor visitor) { + return base_allocator_->AddFreeVisitor(visitor); +} + +size_t GPUNanResetAllocator::RequestedSize(void* ptr) { + return base_allocator_->RequestedSize(ptr); +} + +size_t GPUNanResetAllocator::AllocatedSize(void* ptr) { + return base_allocator_->AllocatedSize(ptr); +} + +} // namespace tensorflow diff --git a/tensorflow/core/common_runtime/gpu/gpu_debug_allocator.h b/tensorflow/core/common_runtime/gpu/gpu_debug_allocator.h new file mode 100644 index 0000000000..c9b564ffc4 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_debug_allocator.h @@ -0,0 +1,68 @@ +#ifndef TENSORFLOW_COMMON_RUNTIME_GPU_GPU_DEBUG_ALLOCATOR_H_ +#define TENSORFLOW_COMMON_RUNTIME_GPU_GPU_DEBUG_ALLOCATOR_H_ + +#include <memory> +#include <string> +#include <unordered_map> +#include <vector> + +#include "tensorflow/core/platform/port.h" +#include "tensorflow/core/common_runtime/gpu/visitable_allocator.h" +#include "tensorflow/stream_executor/stream_executor.h" + +namespace tensorflow { + +// An allocator that wraps a GPU allocator and adds debugging +// functionality that verifies that users do not write outside their +// allocated memory. +class GPUDebugAllocator : public VisitableAllocator { + public: + explicit GPUDebugAllocator(VisitableAllocator* allocator, int device_id); + ~GPUDebugAllocator() override; + string Name() override { return "gpu_debug"; } + void* AllocateRaw(size_t alignment, size_t num_bytes) override; + void DeallocateRaw(void* ptr) override; + void AddAllocVisitor(Visitor visitor) override; + void AddFreeVisitor(Visitor visitor) override; + bool TracksAllocationSizes() override; + size_t RequestedSize(void* ptr) override; + size_t AllocatedSize(void* ptr) override; + + // For testing. + bool CheckHeader(void* ptr); + bool CheckFooter(void* ptr); + + private: + VisitableAllocator* base_allocator_ = nullptr; // owned + + perftools::gputools::StreamExecutor* stream_exec_; // Not owned. + + TF_DISALLOW_COPY_AND_ASSIGN(GPUDebugAllocator); +}; + +// An allocator that wraps a GPU allocator and resets the memory on +// allocation and free to 'NaN', helping to identify cases where the +// user forgets to initialize the memory. +class GPUNanResetAllocator : public VisitableAllocator { + public: + explicit GPUNanResetAllocator(VisitableAllocator* allocator, int device_id); + ~GPUNanResetAllocator() override; + string Name() override { return "gpu_nan_reset"; } + void* AllocateRaw(size_t alignment, size_t num_bytes) override; + void DeallocateRaw(void* ptr) override; + void AddAllocVisitor(Visitor visitor) override; + void AddFreeVisitor(Visitor visitor) override; + size_t RequestedSize(void* ptr) override; + size_t AllocatedSize(void* ptr) override; + + private: + VisitableAllocator* base_allocator_ = nullptr; // owned + + perftools::gputools::StreamExecutor* stream_exec_; // Not owned. + + TF_DISALLOW_COPY_AND_ASSIGN(GPUNanResetAllocator); +}; + +} // namespace tensorflow + +#endif // TENSORFLOW_COMMON_RUNTIME_GPU_GPU_DEBUG_ALLOCATOR_H_ diff --git a/tensorflow/core/common_runtime/gpu/gpu_debug_allocator_test.cc b/tensorflow/core/common_runtime/gpu/gpu_debug_allocator_test.cc new file mode 100644 index 0000000000..5f63906576 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_debug_allocator_test.cc @@ -0,0 +1,207 @@ +#if GOOGLE_CUDA + +#include "tensorflow/core/common_runtime/gpu/gpu_debug_allocator.h" + +#include <algorithm> +#include <vector> + +#include "tensorflow/core/platform/port.h" +#include "tensorflow/core/lib/gtl/inlined_vector.h" +#include "tensorflow/core/platform/logging.h" +#include "tensorflow/core/common_runtime/gpu/gpu_init.h" +#include "tensorflow/core/common_runtime/gpu/gpu_bfc_allocator.h" +#include "tensorflow/stream_executor/multi_platform_manager.h" +#include "tensorflow/stream_executor/stream_executor.h" +#include <gtest/gtest.h> + +namespace gpu = ::perftools::gputools; + +namespace tensorflow { + +TEST(GPUDebugAllocatorTest, OverwriteDetection_None) { + const int device_id = 0; + GPUDebugAllocator a(new GPUBFCAllocator(device_id, 1 << 30), device_id); + auto stream_exec = + GPUMachineManager()->ExecutorForDevice(device_id).ValueOrDie(); + + for (int s : {8}) { + std::vector<int64> cpu_array(s); + memset(&cpu_array[0], 0, cpu_array.size() * sizeof(int64)); + int64* gpu_array = a.Allocate<int64>(cpu_array.size()); + gpu::DeviceMemory<int64> gpu_array_ptr{gpu::DeviceMemoryBase{gpu_array}}; + ASSERT_TRUE(stream_exec->SynchronousMemcpy(&gpu_array_ptr, &cpu_array[0], + s * sizeof(int64))); + EXPECT_TRUE(a.CheckHeader(gpu_array)); + EXPECT_TRUE(a.CheckFooter(gpu_array)); + + // Confirm no error on free. + a.DeallocateRaw(gpu_array); + } +} + +TEST(GPUDebugAllocatorTest, OverwriteDetection_Header) { + for (int s : {8, 211}) { + EXPECT_DEATH( + { + const int device_id = 0; + GPUDebugAllocator a(new GPUBFCAllocator(device_id, 1 << 30), + device_id); + auto stream_exec = + GPUMachineManager()->ExecutorForDevice(device_id).ValueOrDie(); + + std::vector<int64> cpu_array(s); + memset(&cpu_array[0], 0, cpu_array.size() * sizeof(int64)); + int64* gpu_array = a.Allocate<int64>(cpu_array.size()); + + gpu::DeviceMemory<int64> gpu_array_ptr{ + gpu::DeviceMemoryBase{gpu_array}}; + ASSERT_TRUE(stream_exec->SynchronousMemcpy( + &gpu_array_ptr, &cpu_array[0], cpu_array.size() * sizeof(int64))); + + gpu::DeviceMemory<int64> gpu_hdr_ptr{ + gpu::DeviceMemoryBase{gpu_array - 1}}; + // Clobber first word of the header. + float pi = 3.1417; + ASSERT_TRUE( + stream_exec->SynchronousMemcpy(&gpu_hdr_ptr, &pi, sizeof(float))); + + // Expect error on free. + a.Deallocate(gpu_array); + }, + ""); + } +} + +TEST(GPUDebugAllocatorTest, OverwriteDetection_Footer) { + for (int s : {8, 22}) { + EXPECT_DEATH( + { + const int device_id = 0; + GPUDebugAllocator a(new GPUBFCAllocator(device_id, 1 << 30), + device_id); + auto stream_exec = + GPUMachineManager()->ExecutorForDevice(device_id).ValueOrDie(); + + std::vector<int64> cpu_array(s); + memset(&cpu_array[0], 0, cpu_array.size() * sizeof(int64)); + int64* gpu_array = a.Allocate<int64>(cpu_array.size()); + + gpu::DeviceMemory<int64> gpu_array_ptr{ + gpu::DeviceMemoryBase{gpu_array}}; + ASSERT_TRUE(stream_exec->SynchronousMemcpy( + &gpu_array_ptr, &cpu_array[0], cpu_array.size() * sizeof(int64))); + + // Clobber word of the footer. + gpu::DeviceMemory<int64> gpu_ftr_ptr{ + gpu::DeviceMemoryBase{gpu_array + s}}; + float pi = 3.1417; + ASSERT_TRUE( + stream_exec->SynchronousMemcpy(&gpu_ftr_ptr, &pi, sizeof(float))); + + // Expect error on free. + a.Deallocate(gpu_array); + }, + ""); + } +} + +TEST(GPUDebugAllocatorTest, ResetToNan) { + const int device_id = 0; + GPUNanResetAllocator a(new GPUBFCAllocator(device_id, 1 << 30), device_id); + auto stream_exec = + GPUMachineManager()->ExecutorForDevice(device_id).ValueOrDie(); + + std::vector<float> cpu_array(1024); + std::vector<float> cpu_array_result(1024); + + // Allocate 1024 floats + float* gpu_array = a.Allocate<float>(cpu_array.size()); + gpu::DeviceMemory<float> gpu_array_ptr{gpu::DeviceMemoryBase{gpu_array}}; + ASSERT_TRUE(stream_exec->SynchronousMemcpy(&cpu_array[0], gpu_array_ptr, + cpu_array.size() * sizeof(float))); + for (float f : cpu_array) { + ASSERT_FALSE(std::isfinite(f)); + } + + // Set one of the fields to 1.0. + cpu_array[0] = 1.0; + ASSERT_TRUE(stream_exec->SynchronousMemcpy(&gpu_array_ptr, &cpu_array[0], + cpu_array.size() * sizeof(float))); + // Copy the data back and verify. + ASSERT_TRUE( + stream_exec->SynchronousMemcpy(&cpu_array_result[0], gpu_array_ptr, + cpu_array_result.size() * sizeof(float))); + ASSERT_EQ(1.0, cpu_array_result[0]); + + // Free the array + a.Deallocate(gpu_array); + + // All values should be reset to nan. + ASSERT_TRUE( + stream_exec->SynchronousMemcpy(&cpu_array_result[0], gpu_array_ptr, + cpu_array_result.size() * sizeof(float))); + for (float f : cpu_array_result) { + ASSERT_FALSE(std::isfinite(f)); + } +} + +TEST(GPUDebugAllocatorTest, ResetToNanWithHeaderFooter) { + const int device_id = 0; + // NaN reset must be the outer-most allocator. + GPUNanResetAllocator a( + new GPUDebugAllocator(new GPUBFCAllocator(device_id, 1 << 30), device_id), + device_id); + auto stream_exec = + GPUMachineManager()->ExecutorForDevice(device_id).ValueOrDie(); + + std::vector<float> cpu_array(1024); + std::vector<float> cpu_array_result(1024); + + // Allocate 1024 floats + float* gpu_array = a.Allocate<float>(cpu_array.size()); + gpu::DeviceMemory<float> gpu_array_ptr{gpu::DeviceMemoryBase{gpu_array}}; + ASSERT_TRUE(stream_exec->SynchronousMemcpy(&cpu_array[0], gpu_array_ptr, + cpu_array.size() * sizeof(float))); + for (float f : cpu_array) { + ASSERT_FALSE(std::isfinite(f)); + } + + // Set one of the fields to 1.0. + cpu_array[0] = 1.0; + ASSERT_TRUE(stream_exec->SynchronousMemcpy(&gpu_array_ptr, &cpu_array[0], + cpu_array.size() * sizeof(float))); + // Copy the data back and verify. + ASSERT_TRUE( + stream_exec->SynchronousMemcpy(&cpu_array_result[0], gpu_array_ptr, + cpu_array_result.size() * sizeof(float))); + ASSERT_EQ(1.0, cpu_array_result[0]); + + // Free the array + a.Deallocate(gpu_array); + + // All values should be reset to nan. + ASSERT_TRUE( + stream_exec->SynchronousMemcpy(&cpu_array_result[0], gpu_array_ptr, + cpu_array_result.size() * sizeof(float))); + for (float f : cpu_array_result) { + ASSERT_FALSE(std::isfinite(f)); + } +} + +TEST(GPUDebugAllocatorTest, TracksSizes) { + GPUDebugAllocator a(new GPUBFCAllocator(0, 1 << 30), 0); + EXPECT_EQ(true, a.TracksAllocationSizes()); +} + +TEST(GPUDebugAllocatorTest, AllocatedVsRequested) { + GPUNanResetAllocator a( + new GPUDebugAllocator(new GPUBFCAllocator(0, 1 << 30), 0), 0); + float* t1 = a.Allocate<float>(1); + EXPECT_EQ(4, a.RequestedSize(t1)); + EXPECT_EQ(256, a.AllocatedSize(t1)); + a.Deallocate(t1); +} + +} // namespace tensorflow + +#endif // GOOGLE_CUDA diff --git a/tensorflow/core/common_runtime/gpu/gpu_device.cc b/tensorflow/core/common_runtime/gpu/gpu_device.cc new file mode 100644 index 0000000000..26d34645f1 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_device.cc @@ -0,0 +1,651 @@ +// TODO(opensource): Use a more generic sounding preprocessor name than +// GOOGLE_CUDA +#if GOOGLE_CUDA + +#define EIGEN_USE_GPU + +#include "tensorflow/core/common_runtime/gpu/gpu_device.h" + +#include <stdlib.h> +#include <string.h> + +//#include "base/commandlineflags.h" +#include "tensorflow/stream_executor/cuda/cuda_activation.h" +#include "tensorflow/stream_executor/multi_platform_manager.h" +#include "tensorflow/stream_executor/stream.h" +#include "tensorflow/stream_executor/stream_executor.h" +#include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor" +#include "tensorflow/core/common_runtime/device_factory.h" +#include "tensorflow/core/common_runtime/gpu/gpu_event_mgr.h" +#include "tensorflow/core/common_runtime/gpu/gpu_init.h" +#include "tensorflow/core/common_runtime/gpu/gpu_stream_util.h" +#include "tensorflow/core/common_runtime/gpu/gpu_util.h" +#include "tensorflow/core/common_runtime/gpu/process_state.h" +#include "tensorflow/core/common_runtime/gpu_device_context.h" +#include "tensorflow/core/common_runtime/local_device.h" +#include "tensorflow/core/framework/allocator.h" +#include "tensorflow/core/framework/device_base.h" +#include "tensorflow/core/framework/op_kernel.h" +#include "tensorflow/core/framework/types.h" +#include "tensorflow/core/graph/types.h" +#include "tensorflow/core/lib/gtl/stl_util.h" +#include "tensorflow/core/lib/strings/numbers.h" +#include "tensorflow/core/lib/strings/strcat.h" +#include "tensorflow/core/platform/logging.h" +#include "tensorflow/core/platform/port.h" +#include "tensorflow/core/platform/tracing.h" +#include "tensorflow/core/public/session_options.h" +#include "tensorflow/core/public/status.h" +#include "tensorflow/core/public/tensor.h" +#include "tensorflow/core/util/device_name_utils.h" + +#if defined(PLATFORM_GOOGLE) +DEFINE_bool(brain_gpu_sync_every_op, false, + "If true, call GPUUtil::Sync() between every dispatched opkernel."); + +DEFINE_int32(brain_gpu_max_streams, 1, + "Max number of GPU streams to use for computation."); +#else +// TODO(opensource): These should be made options in some options struct, +// rather than flags. +bool FLAGS_brain_gpu_sync_every_op = false; +tensorflow::int32 FLAGS_brain_gpu_max_streams = 1; +#endif + +namespace gpu = ::perftools::gputools; + +namespace tensorflow { + +// Eigen Ops directly allocate memory only for temporary buffers used +// during OpKernel::Compute(). The recommended way of allocating such +// memory is via OpKernelContext::allocate_temp(). However, Eigen Ops +// don't have access to OpKernelContext, instead they get access to +// memory directly through the device allocator. As an Open Source +// project, Eigen assumes allocator semantics similar to those of the +// CUDA memory allocator, and may not work correctly due to race +// conditions if used with some other allocator. For safety, we need +// to delay deallocation calls out of Eigen until all events on the +// corresponding stream have completed. The following two classes +// serve this purpose in two different compilation environments. + +#if defined(__GCUDACC__) || defined(__GCUDACC_HOST__) +class EigenAllocator : public ::Eigen::Allocator { + public: + explicit EigenAllocator(gpu::Stream* stream, ::tensorflow::Allocator* alloc, + EventMgr* em) + : stream_(stream), allocator_(alloc), em_(em) {} + + void* allocate(size_t num_bytes) const override { + void* ret = allocator_->AllocateRaw(32 /* alignment */, num_bytes); + // Eigen doesn't typically check the return pointer from allocate, + // so we do it here and die with a more helpful error message. + if (ret == nullptr) { + LOG(FATAL) << "EigenAllocator for GPU ran out of memory when allocating " + << num_bytes << ". See error logs for more detailed info."; + } + return ret; + } + + void deallocate(void* buffer) const override { + em_->ThenDeleteBuffer(stream_, {allocator_, buffer}); + } + + private: + gpu::Stream* stream_; // Not owned. + ::tensorflow::Allocator* allocator_; // Not owned. + ::tensorflow::EventMgr* em_; // Not owned. + + TF_DISALLOW_COPY_AND_ASSIGN(EigenAllocator); +}; + +#else +class EigenCudaStreamDevice : public ::Eigen::StreamInterface { + public: + EigenCudaStreamDevice(const cudaStream_t* cuda_stream, int gpu_id, + ::tensorflow::Allocator* alloc) + : stream_(cuda_stream), allocator_(alloc) { + Eigen::initializeDeviceProp(); + device_prop_ = &Eigen::m_deviceProperties[gpu_id]; + } + + const cudaStream_t& stream() const override { return *stream_; } + const cudaDeviceProp& deviceProperties() const override { + return *device_prop_; + } + + void* allocate(size_t num_bytes) const override { + void* ret = allocator_->AllocateRaw(32 /* alignment */, num_bytes); + if (ret == nullptr) { + LOG(FATAL) << "EigenAllocator for GPU ran out of memory when allocating " + << num_bytes << ". See error logs for more detailed info."; + } + + return ret; + } + void deallocate(void* buffer) const override { + AsyncFreeData* afData = new AsyncFreeData(allocator_, buffer); + cudaError_t err = cudaStreamAddCallback(*stream_, asyncFree, afData, 0); + CHECK_EQ(err, cudaSuccess); + } + + private: + struct AsyncFreeData { + AsyncFreeData(::tensorflow::Allocator* a, void* p) + : allocator_(a), address_(p) {} + ::tensorflow::Allocator* allocator_; + void* address_; + }; + + static void CUDART_CB asyncFree(cudaStream_t stream, cudaError_t status, + void* userData) { + AsyncFreeData* data = static_cast<AsyncFreeData*>(userData); + data->allocator_->DeallocateRaw(data->address_); + delete data; + } + + const cudaStream_t* stream_; // Not owned. + const cudaDeviceProp* device_prop_; // Not owned. + ::tensorflow::Allocator* allocator_; // Not owned. + + TF_DISALLOW_COPY_AND_ASSIGN(EigenCudaStreamDevice); +}; + +#endif + +BaseGPUDevice::BaseGPUDevice(const SessionOptions& options, const string& name, + Bytes memory_limit, BusAdjacency bus_adjacency, + int gpu_id, const string& physical_device_desc, + Allocator* gpu_allocator, Allocator* cpu_allocator) + : LocalDevice(options, Device::BuildDeviceAttributes( + name, DEVICE_GPU, memory_limit, bus_adjacency, + physical_device_desc), + gpu_allocator), + gpu_allocator_(gpu_allocator), + cpu_allocator_(cpu_allocator), + gpu_id_(gpu_id) { + gpu::StreamExecutor* executor = + GPUMachineManager()->ExecutorForDevice(gpu_id_).ValueOrDie(); + if (!executor) { + LOG(ERROR) << "Failed to get StreamExecutor for device " << gpu_id_; + return; + } + em_.reset(new EventMgr(executor)); + + if (FLAGS_brain_gpu_max_streams < 1) { + LOG(FATAL) << "Invalid value for brain_gpu_max_streams."; + } + + // Create the specified number of GPU streams + for (int i = 0; i < FLAGS_brain_gpu_max_streams; i++) { + auto stream = new gpu::Stream(executor); + stream->Init(); + VLOG(2) << "Created stream[" << i << "] = " << stream; + streams_.push_back(stream); + device_contexts_.push_back(new GPUDeviceContext(i, stream)); + } + gpu_device_info_ = new GpuDeviceInfo; + gpu_device_info_->stream = streams_[0]; + gpu_device_info_->default_context = device_contexts_[0]; + gpu_device_info_->event_mgr = em_.get(); + set_tensorflow_gpu_device_info(gpu_device_info_); +} + +BaseGPUDevice::~BaseGPUDevice() { + delete gpu_device_info_; + for (auto ctx : device_contexts_) ctx->Unref(); + gtl::STLDeleteElements(&streams_); +} + +Status BaseGPUDevice::FillContextMap(const Graph* graph, + DeviceContextMap* device_context_map) { + VLOG(2) << "FillContextMap"; + + const auto num_streams = streams_.size(); + // Special case for single stream. + if (num_streams == 1) { + return Status::OK(); + } + const int64 before = Env::Default()->NowMicros(); + gpu_stream_util::AssignStreamsOpts opts; + opts.max_streams = num_streams; + std::unordered_map<int, int> node_to_stream_id; + TF_RETURN_IF_ERROR( + gpu_stream_util::AssignStreams(graph, opts, &node_to_stream_id)); + int64 elapsed = Env::Default()->NowMicros() - before; + VLOG(3) << "AssignStreams took " << elapsed << "us"; + + // Fill in the context map. It is OK for this map to contain + // duplicate DeviceContexts so long as we increment the refcount. + for (Node* n : graph->nodes()) { + auto mapped_stream = node_to_stream_id[n->id()]; + CHECK_LE(mapped_stream, num_streams); + auto ctx = device_contexts_[mapped_stream]; + VLOG(3) << "Assigned stream " << node_to_stream_id[n->id()] + << " ==> stream[" << ctx->stream_id() << "] for node id " << n->id() + << " " << n->type_string() << " " << n->name(); + ctx->Ref(); + device_context_map->insert(std::make_pair(n->id(), ctx)); + } + + return Status::OK(); +} + +void BaseGPUDevice::Compute(OpKernel* op_kernel, OpKernelContext* context) { + // ScopedActivity is cheap when tracing is not active, but we + // can avoid computing the Hash64. + // TODO(pbar) This would no longer be needed if Ops have a unique id. + const uint64 id = port::Tracing::IsActive() ? Hash64(op_kernel->name()) : 0; + port::Tracing::ScopedActivity region(port::Tracing::EventCategory::kCompute, + id); + + GPUDeviceContext* gpu_device_context = device_contexts_[0]; + if (context->op_device_context() != nullptr) { + gpu_device_context = + static_cast<GPUDeviceContext*>(context->op_device_context()); + } + gpu::Stream* stream = gpu_device_context->stream(); + const auto stream_id = gpu_device_context->stream_id(); + + VLOG(1) << "GpuDevice::Compute " << op_kernel->name() << " op " + << op_kernel->def().op() << " on GPU" << gpu_id_ << " stream[" + << stream_id << "]"; + + // NOTE(tucker): We need to discriminate between Eigen GPU + // operations and all others. If an operation is Eigen + // implemented (or otherwise tries to launch a cuda kernel + // directly), we need to establish a stacked-scoped environment + // that directs it to execute on the proper device. Otherwise we + // expect the Op to use StreamExecutor directly and correctly. The + // way we make this discrimination is quite hacky: At the moment + // the only non-Eigen GPU Op is the recv-op, which is known to be + // asynchronous. + if (op_kernel->type_string() == "_Recv") { + context->SetStatus(errors::Internal( + "Invalid synchronous 'Compute' on GPU for '_Recv' op")); + } else { + const string label = + strings::StrCat(op_kernel->name(), ":", op_kernel->type_string()); + port::Tracing::ScopedAnnotation annotation(label); + + const auto num_streams = streams_.size(); + if (num_streams > 1) { + // If this op's device context is different from the other contexts, + // we must wait on the stream. + for (int i = 0; i < context->num_inputs(); ++i) { + const GPUDeviceContext* idc = + static_cast<GPUDeviceContext*>(context->input_device_context(i)); + OP_REQUIRES(context, idc != nullptr, + errors::Internal("Input device context ", i, + " was not set properly.")); + if (VLOG_IS_ON(2)) { + const void* base; + size_t len; + if (context->has_input(i)) { + if (IsRefType(context->input_dtype(i))) { + Tensor tensor = context->mutable_input(i, false); + base = DMAHelper::base(&tensor); + len = tensor.TotalBytes(); + } else { + const Tensor& tensor = context->input(i); + base = DMAHelper::base(&tensor); + len = tensor.TotalBytes(); + } + VLOG(2) << "Input " << i << " " << base << " " << len; + VLOG(2) << " stream[" << stream_id << "].ThenWaitFor(stream[" + << idc->stream_id() << "])" + << ((idc->stream() == stream) ? " not needed" : ""); + } + } + if (idc->stream() != stream) stream->ThenWaitFor(idc->stream()); + } + } + gpu::cuda::ScopedActivateExecutorContext scoped_activation{ + stream->parent(), gpu::cuda::MultiOpActivation::kYes}; + // Keep a copy of the inputs before Compute runs, in case they get + // deleted. TODO(misard) this will be fixed when the tracking is + // done right. + std::vector<Tensor>* tensor_refs = nullptr; + if (!FLAGS_brain_gpu_sync_every_op) { + tensor_refs = new std::vector<Tensor>; + tensor_refs->reserve(context->num_inputs() + context->num_outputs()); + for (int ii = 0; ii < context->num_inputs(); ++ii) { + if (context->has_input(ii)) { + if (IsRefType(context->input_dtype(ii))) { + Tensor in = context->mutable_input(ii, false); + tensor_refs->push_back(in); + } else { + const Tensor& in = context->input(ii); + tensor_refs->push_back(in); + } + } + } + } + op_kernel->Compute(context); + if (context->status().ok()) { + if (FLAGS_brain_gpu_sync_every_op) { + // Note: GPUUtil::Sync() only syncs the default stream. + // We need to either sync the stream used by this op, or + // all streams. Given that this flag is typically used for + // debugging it makes more sense to sync all GPU activity. + context->SetStatus(GPUUtil::SyncAll(this)); + } else { + // The GPU kernel has been queued, but may not complete for some + // time. As soon as this function completes, the caller will + // discard its refs on the inputs, outputs and any scratch + // tensors it created. Create additional refs here that will be + // held until the kernel completes. + for (int ii = 0; ii < context->num_temps(); ++ii) { + Tensor* temp = context->temp(ii); + VLOG(2) << "Saving ref to temp Tensor @ " << DMAHelper::base(temp); + tensor_refs->push_back(*temp); + } + for (int ii = 0; ii < context->num_outputs(); ++ii) { + Tensor* temp = context->mutable_output(ii); + if (nullptr != temp) { + tensor_refs->push_back(*temp); + } + } + em_->ThenDeleteTensors(stream, tensor_refs); + } + } else { + if (!FLAGS_brain_gpu_sync_every_op) { + delete tensor_refs; + } + } + } +} + +Status BaseGPUDevice::Sync() { return GPUUtil::Sync(this); } + +void BaseGPUDevice::ComputeAsync(AsyncOpKernel* op_kernel, + OpKernelContext* context, + AsyncOpKernel::DoneCallback done) { + GPUDeviceContext* gpu_device_context = device_contexts_[0]; + if (context->op_device_context() != nullptr) { + gpu_device_context = + static_cast<GPUDeviceContext*>(context->op_device_context()); + } + const auto stream_id = gpu_device_context->stream_id(); + + VLOG(1) << "GpuDevice::ComputeAsync " << op_kernel->name() << " op " + << op_kernel->def().op() << " on GPU" << gpu_id_ << " stream[" + << stream_id << "]"; + + port::Tracing::TraceMe activity( + strings::StrCat(op_kernel->name(), ":", op_kernel->type_string())); + op_kernel->ComputeAsync(context, done); +} + +Status BaseGPUDevice::MakeTensorFromProto(const TensorProto& tensor_proto, + const AllocatorAttributes alloc_attrs, + Tensor* tensor) { + AllocatorAttributes attr; + attr.set_on_host(true); + attr.set_gpu_compatible(true); + Allocator* host_alloc = GetAllocator(attr); + Tensor parsed(tensor_proto.dtype()); + if (!parsed.FromProto(host_alloc, tensor_proto)) { + return errors::InvalidArgument("Cannot parse tensor from proto: ", + tensor_proto.DebugString()); + } + Status status; + if (alloc_attrs.on_host()) { + *tensor = parsed; + } else { + if (!DMAHelper::CanUseDMA(&parsed)) { + return errors::Internal("GPU copy from non-DMA ", + DataTypeString(parsed.dtype()), " tensor"); + } + Tensor copy(GetAllocator(alloc_attrs), parsed.dtype(), parsed.shape()); + port::Tracing::ScopedAnnotation annotation("MakeTensorFromProto"); + Notification n; + device_contexts_[0]->CopyCPUTensorToDevice(&parsed, this, ©, + [&n, &status](const Status& s) { + status = s; + n.Notify(); + }); + n.WaitForNotification(); + *tensor = copy; + } + return status; +} + +namespace { +#if defined(__GCUDACC__) || defined(__GCUDACC_HOST__) +class ConcretePerOpGpuDevice : public PerOpGpuDevice { + public: + explicit ConcretePerOpGpuDevice(gpu::Stream* stream, + EigenAllocator* allocator) + : device_(stream, allocator), allocator_(allocator) {} + ~ConcretePerOpGpuDevice() { delete allocator_; } + + const Eigen::GpuDevice& device() const override { return device_; } + + private: + Eigen::GpuDevice device_; + EigenAllocator* allocator_; +}; +#else +class ConcretePerOpGpuDevice : public PerOpGpuDevice { + public: + explicit ConcretePerOpGpuDevice(EigenCudaStreamDevice* stream_device) + : device_(stream_device), stream_device_(stream_device) {} + ~ConcretePerOpGpuDevice() { delete stream_device_; } + + const Eigen::GpuDevice& device() const override { return device_; } + + private: + Eigen::GpuDevice device_; + EigenCudaStreamDevice* stream_device_; +}; +#endif +} // namespace + +const PerOpGpuDevice* BaseGPUDevice::NewDevice(int stream_id, + Allocator* allocator) { +#if defined(__GCUDACC__) || defined(__GCUDACC_HOST__) + auto ea = new EigenAllocator(streams_[stream_id], allocator, em_.get()); + return new ConcretePerOpGpuDevice(streams_[stream_id], ea); +#else + const cudaStream_t* cuda_stream = reinterpret_cast<const cudaStream_t*>( + streams_[stream_id]->implementation()->CudaStreamMemberHack()); + auto es = new EigenCudaStreamDevice(cuda_stream, gpu_id_, allocator); + return new ConcretePerOpGpuDevice(es); +#endif +} + +const PerOpGpuDevice* BaseGPUDevice::MakeGpuDevice(DeviceContext* dc, + Allocator* allocator) { + if (dc) { + const GPUDeviceContext* gpu_dc = static_cast<GPUDeviceContext*>(dc); + const int stream_id = gpu_dc->stream_id(); + VLOG(1) << " eigen_gpu_device(" << dc << ") => stream[" << stream_id + << "]"; + CHECK_LT(stream_id, streams_.size()); + return NewDevice(stream_id, allocator); + } else { + return NewDevice(0, allocator); + } +} + +void BaseGPUDeviceFactory::CreateDevices(const SessionOptions& options, + const string& name_prefix, + std::vector<Device*>* devices) { + int n = INT_MAX; + auto iter = options.config.device_count().find("GPU"); + if (iter != options.config.device_count().end()) { + n = iter->second; + } + std::vector<int> valid_gpu_ids; + GetValidDeviceIds(&valid_gpu_ids); + if (static_cast<size_t>(n) > valid_gpu_ids.size()) { + n = valid_gpu_ids.size(); + } + for (int i = 0; i < n; i++) { + devices->push_back(CreateGPUDevice( + options, strings::StrCat(name_prefix, "/gpu:", i), valid_gpu_ids[i])); + } +} + +namespace { +int64 MinSystemMemory(int64 available_memory) { + // We use the following heuristic for now: + // + // If the available_memory is < 2GiB, we allocate 200MiB to system memory. + // Otherwise, allocate 300MiB to system memory. + // + // In the future we could be more sophisticated by using a table of + // devices. + if (available_memory < (1LL << 31)) { + // 200MiB + return 209715200LL; + } else { + // max(300 MiB, 0.95 * available_memory) + return std::max(314572800LL, static_cast<int64>(available_memory * 0.05)); + } +} +} // namespace + +static string GetShortDeviceDescription(int device_id, + const gpu::DeviceDescription& desc) { + return strings::StrCat("device: ", device_id, ", name: ", desc.name(), + ", pci bus id: ", desc.pci_bus_id()); +} + +LocalDevice* BaseGPUDeviceFactory::CreateGPUDevice( + const SessionOptions& options, const string& name, int gpu_id) { + CHECK_GE(gpu_id, 0); + + // Look up the device, to see its attributes. + gpu::Platform* gpu_platform = GPUMachineManager(); + CHECK_LT(gpu_id, gpu_platform->VisibleDeviceCount()); + gpu::StreamExecutor* se = + gpu_platform->ExecutorForDevice(gpu_id).ValueOrDie(); + const gpu::DeviceDescription& desc = se->GetDeviceDescription(); + + int64 total_memory, available_memory; + CHECK(se->DeviceMemoryUsage(&available_memory, &total_memory)); + + int64 allocated_memory = available_memory; + double config_memory_fraction = + options.config.gpu_options().per_process_gpu_memory_fraction(); + if (config_memory_fraction == 0) { + const int64 min_system_memory = MinSystemMemory(available_memory); + if (min_system_memory < allocated_memory) { + allocated_memory -= min_system_memory; + } + } else { + allocated_memory *= config_memory_fraction; + } + + Bytes allocated_bytes = static_cast<Bytes>(allocated_memory); + + // Get GPU BusAdjacency from its reported NUMA affinity. + // Because GPUs are virtualized in some environments, we can't just + // use the GPU id. + BusAdjacency bus_adjacency = BUS_ANY; + switch (desc.numa_node()) { + case 0: + bus_adjacency = BUS_0; + break; + case 1: + bus_adjacency = BUS_1; + break; + default: + bus_adjacency = BUS_ANY; + } + VLOG(1) << "GPUDevice id " << gpu_id << " on bus " << bus_adjacency + << " numa: " << desc.numa_node() << " pci: " << desc.pci_bus_id(); + + ProcessState* process_state = ProcessState::singleton(); + return CreateGPUDevice( + options, name, allocated_bytes, bus_adjacency, gpu_id, + GetShortDeviceDescription(gpu_id, desc), + process_state->GetGPUAllocator(gpu_id, allocated_memory), + process_state->GetCPUAllocator(desc.numa_node())); +} + +static int GetMinGPUMultiprocessorCount() { + static const int kDefaultMinGPUMultiprocessorCount = 8; + + const char* tf_min_gpu_core_count = getenv("TF_MIN_GPU_MULTIPROCESSOR_COUNT"); + + if (tf_min_gpu_core_count == nullptr || + strcmp(tf_min_gpu_core_count, "") == 0) { + return kDefaultMinGPUMultiprocessorCount; + } + + int min_gpu_core_count = -1; + if (strings::safe_strto32(tf_min_gpu_core_count, &min_gpu_core_count)) { + if (min_gpu_core_count >= 0) { + return min_gpu_core_count; + } + } + + LOG(ERROR) << "Invalid minimum GPU multiprocessor count: [" + << tf_min_gpu_core_count << "]. " + << "Using the default value: " + << kDefaultMinGPUMultiprocessorCount; + return kDefaultMinGPUMultiprocessorCount; +} + +void BaseGPUDeviceFactory::GetValidDeviceIds(std::vector<int>* ids) { + auto gpu_manager = GPUMachineManager(); + int min_gpu_core_count = GetMinGPUMultiprocessorCount(); + if (gpu_manager) { + auto visible_device_count = gpu_manager->VisibleDeviceCount(); + for (int i = 0; i < gpu_manager->VisibleDeviceCount(); ++i) { + auto exec_status = gpu_manager->ExecutorForDevice(i); + if (!exec_status.ok()) { + continue; + } + gpu::StreamExecutor* se = exec_status.ValueOrDie(); + const gpu::DeviceDescription& desc = se->GetDeviceDescription(); + int major, minor; + if (!desc.cuda_compute_capability(&major, &minor)) { + continue; + } + // Only consider GPUs with compute capability >= 3.5 (Kepler or + // higher) + if (major < 3 || (major == 3 && minor < 5)) { + LOG(INFO) << "Ignoring gpu device " + << "(" << GetShortDeviceDescription(i, desc) << ") " + << "with Cuda compute capability " << major << "." << minor + << ". The minimum required Cuda capability is 3.5."; + continue; + } + + // TensorFlow currently places computation on devices assuming + // they have similar capability. + // + // If there are multiple GPUs available on the machine, only + // consider GPUs with 8 or more multiprocessors. + // + // TODO(vrv): In the medium term: we should only filter out GPUs + // that are slow relative to the fastest GPU. In the long term, + // TensorFlow should support automatic placement based on + // capability. + if (visible_device_count > 1) { + if (desc.core_count() < min_gpu_core_count) { + LOG(INFO) << "Ignoring gpu device " + << "(" << GetShortDeviceDescription(i, desc) << ") " + << "with Cuda multiprocessor count: " << desc.core_count() + << ". The minimum required count is " << min_gpu_core_count + << ". You can adjust this requirement with the env var " + "TF_MIN_GPU_MULTIPROCESSOR_COUNT."; + continue; + } + } + + int new_id = ids->size(); + ids->push_back(i); + + LOG(INFO) << "Creating TensorFlow device (/gpu:" << new_id << ") -> " + << "(" << GetShortDeviceDescription(i, desc) << ")"; + } + } +} + +} // namespace tensorflow + +#endif // GOOGLE_CUDA diff --git a/tensorflow/core/common_runtime/gpu/gpu_device.h b/tensorflow/core/common_runtime/gpu/gpu_device.h new file mode 100644 index 0000000000..a415224d95 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_device.h @@ -0,0 +1,94 @@ +#if !GOOGLE_CUDA +#error This file must only be included when building with Cuda support +#endif + +#ifndef TENSORFLOW_COMMON_RUNTIME_GPU_GPU_DEVICE_H_ +#define TENSORFLOW_COMMON_RUNTIME_GPU_GPU_DEVICE_H_ + +#include "tensorflow/core/common_runtime/device_factory.h" +#include "tensorflow/core/common_runtime/gpu_device_context.h" +#include "tensorflow/core/common_runtime/local_device.h" +#include "tensorflow/core/framework/allocator.h" +#include "tensorflow/core/framework/device_base.h" +#include "tensorflow/core/framework/op_kernel.h" +#include "tensorflow/core/platform/port.h" +#include "tensorflow/core/public/session_options.h" +#include "tensorflow/core/public/status.h" +#include "tensorflow/core/public/tensor.h" +#include "tensorflow/core/common_runtime/gpu/gpu_event_mgr.h" +#include "tensorflow/stream_executor/stream.h" +#include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor" + +namespace tensorflow { + +class EigenAllocator; + +class BaseGPUDevice : public LocalDevice { + public: + BaseGPUDevice(const SessionOptions& options, const string& name, + Bytes memory_limit, BusAdjacency bus_adjacency, int gpu_id, + const string& physical_device_desc, Allocator* gpu_allocator, + Allocator* cpu_allocator); + + ~BaseGPUDevice() override; + + // GPU devices require the Op Compute method to save a reference to + // any temporary tensors that are allocated until the Op execution + // completes. + bool SaveTemporaryTensors() const override { return true; } + + Status FillContextMap(const Graph* graph, + DeviceContextMap* device_context_map); + + void Compute(OpKernel* op_kernel, OpKernelContext* context) override; + + Status Sync() override; + + void ComputeAsync(AsyncOpKernel* op_kernel, OpKernelContext* context, + AsyncOpKernel::DoneCallback done) override; + + Status MakeTensorFromProto(const TensorProto& tensor_proto, + const AllocatorAttributes alloc_attrs, + Tensor* tensor) override; + + // The caller owns the returned device. + const PerOpGpuDevice* MakeGpuDevice(DeviceContext* dc, + Allocator* allocator) override; + + protected: + Allocator* gpu_allocator_; // not owned + Allocator* cpu_allocator_; // not owned + + private: + std::vector<gpu::Stream*> streams_; + std::vector<GPUDeviceContext*> device_contexts_; + GpuDeviceInfo* gpu_device_info_ = nullptr; + mutex trace_mu_; + int gpu_id_ = -1; + std::unique_ptr<EventMgr> em_; + + const PerOpGpuDevice* NewDevice(int stream_id, Allocator* allocator); +}; + +class BaseGPUDeviceFactory : public DeviceFactory { + public: + void CreateDevices(const SessionOptions& options, const string& name_prefix, + std::vector<Device*>* devices) override; + + private: + LocalDevice* CreateGPUDevice(const SessionOptions& options, + const string& name, int gpu_id); + + virtual LocalDevice* CreateGPUDevice(const SessionOptions& options, + const string& name, Bytes memory_limit, + BusAdjacency bus_adjacency, int gpu_id, + const string& physical_device_desc, + Allocator* gpu_allocator, + Allocator* cpu_allocator) = 0; + + void GetValidDeviceIds(std::vector<int>* ids); +}; + +} // namespace tensorflow + +#endif // TENSORFLOW_COMMON_RUNTIME_GPU_GPU_DEVICE_H_ diff --git a/tensorflow/core/common_runtime/gpu/gpu_device_factory.cc b/tensorflow/core/common_runtime/gpu/gpu_device_factory.cc new file mode 100644 index 0000000000..240ac47499 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_device_factory.cc @@ -0,0 +1,52 @@ +#if GOOGLE_CUDA + +#define EIGEN_USE_GPU + +#include "tensorflow/core/common_runtime/gpu/gpu_device.h" +#include "tensorflow/core/common_runtime/gpu/process_state.h" + +namespace tensorflow { + +void RequireGPUDevice() {} + +class GPUDevice : public BaseGPUDevice { + public: + GPUDevice(const SessionOptions& options, const string& name, + Bytes memory_limit, BusAdjacency bus_adjacency, int gpu_id, + const string& physical_device_desc, Allocator* gpu_allocator, + Allocator* cpu_allocator) + : BaseGPUDevice(options, name, memory_limit, bus_adjacency, gpu_id, + physical_device_desc, gpu_allocator, cpu_allocator) {} + + Allocator* GetAllocator(AllocatorAttributes attr) override { + if (attr.on_host()) { + ProcessState* ps = ProcessState::singleton(); + if (attr.gpu_compatible()) { + return ps->GetCUDAHostAllocator(0); + } else { + return cpu_allocator_; + } + } else { + return gpu_allocator_; + } + } +}; + +class GPUDeviceFactory : public BaseGPUDeviceFactory { + private: + LocalDevice* CreateGPUDevice(const SessionOptions& options, + const string& name, Bytes memory_limit, + BusAdjacency bus_adjacency, int gpu_id, + const string& physical_device_desc, + Allocator* gpu_allocator, + Allocator* cpu_allocator) override { + return new GPUDevice(options, name, memory_limit, bus_adjacency, gpu_id, + physical_device_desc, gpu_allocator, cpu_allocator); + } +}; + +REGISTER_LOCAL_DEVICE_FACTORY("GPU", GPUDeviceFactory); + +} // namespace tensorflow + +#endif // GOOGLE_CUDA diff --git a/tensorflow/core/common_runtime/gpu/gpu_event_mgr.cc b/tensorflow/core/common_runtime/gpu/gpu_event_mgr.cc new file mode 100644 index 0000000000..29d6281733 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_event_mgr.cc @@ -0,0 +1,132 @@ +#include "tensorflow/core/common_runtime/gpu/gpu_event_mgr.h" + +#include "tensorflow/stream_executor/event.h" +#include "tensorflow/stream_executor/stream.h" + +namespace gpu = ::perftools::gputools; + +namespace tensorflow { + +EventMgr::EventMgr(gpu::StreamExecutor* se) + : exec_(se), + // threadpool_ has 1 thread for the polling loop, and one to execute + // event callback functions. Maybe we should have more? + threadpool_(Env::Default(), "GPU_Event_Manager", 2) { + threadpool_.Schedule([this]() { PollLoop(); }); +} + +EventMgr::~EventMgr() { + stop_polling_.Notify(); + // Shut down the backup polling loop. + polling_stopped_.WaitForNotification(); + + // Events are owned by this object. + for (auto& e : free_events_) { + delete e; + } + while (!used_events_.empty()) { + delete used_events_[0].event; + delete used_events_[0].mem; + if (used_events_[0].bufrec.buf) { + used_events_[0].bufrec.alloc->DeallocateRaw(used_events_[0].bufrec.buf); + } + if (used_events_[0].func != nullptr) + threadpool_.Schedule(used_events_[0].func); + used_events_.pop_front(); + } +} + +// This polling loop runs at a relatively low frequency. Most calls to +// PollEvents() should come directly from Compute() via +// ThenDeleteTensors(). This function's purpose is to ensure that +// even if no more GPU operations are being requested, we still +// eventually clear the queue. It seems to prevent some tensorflow +// programs from stalling for reasons not yet understood. +void EventMgr::PollLoop() { + while (!stop_polling_.HasBeenNotified()) { + Env::Default()->SleepForMicroseconds(1 * 1000); + { + mutex_lock l(mu_); + PollEvents(true); + } + } + polling_stopped_.Notify(); +} + +void EventMgr::QueueInUse(gpu::Stream* stream, InUse iu) { + VLOG(2) << "QueueInUse free_events_ " << free_events_.size() + << " used_events_ " << used_events_.size(); + // Events are created on demand, and repeatedly reused. There is no + // limit placed here on the number of allocated Events. + if (free_events_.empty()) { + free_events_.push_back(new gpu::Event(exec_)); + free_events_.back()->Init(); + } + gpu::Event* e = free_events_.back(); + free_events_.pop_back(); + stream->ThenRecordEvent(e); + iu.event = e; + used_events_.push_back(iu); +} + +// This function must be called periodically to check whether pending +// events have recorded, and then retire them. Initial observations +// suggest that typical behavior in a TensorFlow program is to have +// 0-3 events pending most of the time, but there are occasionally +// spikes of up to several hundred outstanding. +// +// NOTE: If all events are on the same stream, no later event will +// complete before an earlier event, except possibly if the earlier +// event transitions to an error state, so there's no advantage in +// looking past the first kPending event. However, if we're using +// multiple streams there may be some gain in looking deeper. +// As a compromise, PollEvent() calls that are triggered by the queueing +// of a single event never look past the first kPending event. Calls +// coming from the dedicated polling thread always sweep the full queue. +// +// Note that allowing the queue to grow very long could cause overall +// GPU memory use to spike needlessly. An alternative strategy would +// be to throttle new Op execution until the pending event queue +// clears. +void EventMgr::PollEvents(bool is_dedicated_poller) { + VLOG(2) << "PollEvents free_events_ " << free_events_.size() + << " used_events_ " << used_events_.size(); + // Sweep the remaining events in order. If this is the dedicated + // polling thread, check the entire set. Otherwise, just sweep up to + // the first non-complete record that is still pending. + for (auto& iu : used_events_) { + if (iu.event == nullptr) continue; + gpu::Event::Status s = iu.event->PollForStatus(); + switch (s) { + case gpu::Event::Status::kUnknown: + case gpu::Event::Status::kError: + // We don't expect to see these. Someday maybe propagate + // a Status error, but for now fail hard. + LOG(FATAL) << "Unexpected Event status: " << static_cast<int>(s); + break; + case gpu::Event::Status::kPending: + if (!is_dedicated_poller) return; // quit processing queue + break; + case gpu::Event::Status::kComplete: + delete iu.mem; + if (iu.bufrec.buf) iu.bufrec.alloc->DeallocateRaw(iu.bufrec.buf); + // The function must be called in another thread, outside of + // the mutex held here. + if (iu.func != nullptr) threadpool_.Schedule(iu.func); + free_events_.push_back(iu.event); + // Mark this InUse record as completed. + iu.event = nullptr; + } + } + // Then clear any completed InUse records from the front of the queue. + while (!used_events_.empty()) { + InUse& iu = used_events_.front(); + if (iu.event == nullptr) { + used_events_.pop_front(); + } else { + break; + } + } +} + +} // namespace tensorflow diff --git a/tensorflow/core/common_runtime/gpu/gpu_event_mgr.h b/tensorflow/core/common_runtime/gpu/gpu_event_mgr.h new file mode 100644 index 0000000000..f9436566d4 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_event_mgr.h @@ -0,0 +1,118 @@ +#ifndef TENSORFLOW_COMMON_RUNTIME_GPU_GPU_EVENT_MGR_H_ +#define TENSORFLOW_COMMON_RUNTIME_GPU_GPU_EVENT_MGR_H_ + +#include <deque> +#include <vector> +#include "tensorflow/core/lib/core/notification.h" +#include "tensorflow/core/lib/core/threadpool.h" +#include "tensorflow/core/platform/port.h" +#include "tensorflow/core/platform/thread_annotations.h" +#include "tensorflow/core/public/tensor.h" + +namespace perftools { +namespace gputools { +class Event; +class Stream; +class StreamExecutor; +} // namespace gputools +} // namespace perftools + +namespace tensorflow { + +// An object to keep track of pending Events in the StreamExecutor streams +// and associated Tensors that cannot safely be deleted until the associated +// Events are recorded. +class EventMgr { + public: + explicit EventMgr(perftools::gputools::StreamExecutor* se); + + ~EventMgr(); + + // Takes ownership of *tensors and deletes it as soon as all events + // currently enqueued on *stream have completed. + inline void ThenDeleteTensors(perftools::gputools::Stream* stream, + std::vector<Tensor>* tensors) { + mutex_lock l(mu_); + QueueTensors(stream, tensors); + PollEvents(false); + } + + struct BufRec { + Allocator* alloc; + void* buf; + }; + + // Takes ownership of *bufrec.buf and calls bufrec.alloc->DeallocateRaw() + // on it as soon as all events currently enqueued on *stream have completed. + inline void ThenDeleteBuffer(perftools::gputools::Stream* stream, + BufRec bufrec) { + mutex_lock l(mu_); + QueueBuffer(stream, bufrec); + PollEvents(false); + } + + inline void ThenExecute(perftools::gputools::Stream* stream, + std::function<void()> func) { + mutex_lock l(mu_); + QueueFunc(stream, func); + PollEvents(false); + } + + private: + friend class TEST_EventMgrHelper; + mutex mu_; + perftools::gputools::StreamExecutor* exec_; + + struct InUse { + perftools::gputools::Event* event; + std::vector<Tensor>* mem; + BufRec bufrec; + std::function<void()> func; + }; + + // Stream-enqueue an unused Event and save with it a collection of + // Tensors and/or a BufRec to be deleted only after the Event + // records. + void QueueInUse(perftools::gputools::Stream* stream, InUse in_use) + EXCLUSIVE_LOCKS_REQUIRED(mu_); + + void QueueTensors(perftools::gputools::Stream* stream, + std::vector<Tensor>* tensors) + EXCLUSIVE_LOCKS_REQUIRED(mu_) { + QueueInUse(stream, {nullptr, tensors, BufRec(), nullptr}); + } + + void QueueBuffer(perftools::gputools::Stream* stream, BufRec bufrec) + EXCLUSIVE_LOCKS_REQUIRED(mu_) { + QueueInUse(stream, {nullptr, nullptr, bufrec, nullptr}); + } + + void QueueFunc(perftools::gputools::Stream* stream, + std::function<void()> func) EXCLUSIVE_LOCKS_REQUIRED(mu_) { + QueueInUse(stream, {nullptr, nullptr, BufRec(), func}); + } + + // This function should be called at roughly the same tempo as + // QueueTensors() to check whether pending events have recorded, + // and then retire them. + void PollEvents(bool is_dedicated_poller) EXCLUSIVE_LOCKS_REQUIRED(mu_); + + // An internal polling loop that runs at a low frequency to clear + // straggler Events. + void PollLoop(); + + // A stack of unused events + std::vector<perftools::gputools::Event*> free_events_ GUARDED_BY(mu_); + + // A FIFO queue of InUse events and associated tensors. + std::deque<InUse> used_events_ GUARDED_BY(mu_); + + Notification stop_polling_; + Notification polling_stopped_; + + // The main PollLoop for the event manager runs in this threadpool. + thread::ThreadPool threadpool_; +}; + +} // namespace tensorflow +#endif // TENSORFLOW_COMMON_RUNTIME_GPU_GPU_EVENT_MGR_H_ diff --git a/tensorflow/core/common_runtime/gpu/gpu_event_mgr_test.cc b/tensorflow/core/common_runtime/gpu/gpu_event_mgr_test.cc new file mode 100644 index 0000000000..30ca1ff187 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_event_mgr_test.cc @@ -0,0 +1,152 @@ +#if GOOGLE_CUDA + +#include "tensorflow/core/common_runtime/gpu/gpu_event_mgr.h" + +#include "tensorflow/core/common_runtime/gpu/gpu_init.h" +#include "tensorflow/stream_executor/multi_platform_manager.h" +#include "tensorflow/stream_executor/stream_executor.h" +#include <gtest/gtest.h> + +namespace gpu = ::perftools::gputools; + +namespace tensorflow { + +class TEST_EventMgrHelper { + public: + explicit TEST_EventMgrHelper(EventMgr* em) : em_(em) {} + + int queue_size() { + mutex_lock l(em_->mu_); + return em_->used_events_.size(); + } + + int free_size() { + mutex_lock l(em_->mu_); + return em_->free_events_.size(); + } + + void QueueTensors(perftools::gputools::Stream* stream, + std::vector<Tensor>* tensors) { + mutex_lock l(em_->mu_); + em_->QueueTensors(stream, tensors); + } + + void PollEvents(bool is_dedicated_poller) { + mutex_lock l(em_->mu_); + em_->PollEvents(is_dedicated_poller); + } + + private: + EventMgr* em_; +}; + +namespace { + +TEST(EventMgr, Empty) { + auto stream_exec = GPUMachineManager()->ExecutorForDevice(0).ValueOrDie(); + EventMgr em(stream_exec); + TEST_EventMgrHelper th(&em); + EXPECT_EQ(0, th.queue_size()); + EXPECT_EQ(0, th.free_size()); +} + +// Delaying polling until after several enqueings should grow the +// total number of allocated events. Once we have enough events for +// the max simultaneously pending, we should not allocate any more. +TEST(EventMgr, DelayedPolling) { + auto stream_exec = GPUMachineManager()->ExecutorForDevice(0).ValueOrDie(); + EventMgr em(stream_exec); + TEST_EventMgrHelper th(&em); + EXPECT_EQ(0, th.queue_size()); + std::vector<Tensor>* v = nullptr; + std::unique_ptr<gpu::Stream> stream(new gpu::Stream(stream_exec)); + CHECK(stream.get()); + stream->Init(); + for (int i = 0; i < 5; ++i) { + v = new std::vector<Tensor>; + th.QueueTensors(stream.get(), v); + EXPECT_EQ(i + 1, th.queue_size()); + EXPECT_EQ(0, th.free_size()); + } + th.PollEvents(false); + EXPECT_EQ(0, th.queue_size()); + EXPECT_EQ(5, th.free_size()); + for (int j = 0; j < 2; ++j) { + for (int i = 0; i < 5; ++i) { + v = new std::vector<Tensor>; + th.QueueTensors(stream.get(), v); + EXPECT_EQ(i + 1, th.queue_size()); + EXPECT_EQ(4 - i, th.free_size()); + } + th.PollEvents(false); + EXPECT_EQ(0, th.queue_size()); + EXPECT_EQ(5, th.free_size()); + } +} + +// Immediate polling should require only one event to be allocated. +TEST(EventMgr, ImmediatePolling) { + auto stream_exec = GPUMachineManager()->ExecutorForDevice(0).ValueOrDie(); + EventMgr em(stream_exec); + TEST_EventMgrHelper th(&em); + EXPECT_EQ(0, th.queue_size()); + EXPECT_EQ(0, th.free_size()); + std::vector<Tensor>* v = nullptr; + std::unique_ptr<gpu::Stream> stream(new gpu::Stream(stream_exec)); + CHECK(stream.get()); + stream->Init(); + for (int i = 0; i < 5; ++i) { + v = new std::vector<Tensor>; + em.ThenDeleteTensors(stream.get(), v); + EXPECT_EQ(0, th.queue_size()); + EXPECT_EQ(1, th.free_size()); + } +} + +// If we delay polling by more than 1 second, the backup polling loop +// should clear the queue. +TEST(EventMgr, LongDelayedPolling) { + auto stream_exec = GPUMachineManager()->ExecutorForDevice(0).ValueOrDie(); + EventMgr em(stream_exec); + TEST_EventMgrHelper th(&em); + EXPECT_EQ(0, th.queue_size()); + EXPECT_EQ(0, th.free_size()); + std::vector<Tensor>* v = nullptr; + std::unique_ptr<gpu::Stream> stream(new gpu::Stream(stream_exec)); + CHECK(stream.get()); + stream->Init(); + for (int i = 0; i < 5; ++i) { + v = new std::vector<Tensor>; + th.QueueTensors(stream.get(), v); + EXPECT_EQ(1 + i, th.queue_size()); + EXPECT_EQ(0, th.free_size()); + } + sleep(1); + EXPECT_EQ(0, th.queue_size()); + EXPECT_EQ(5, th.free_size()); +} + +// Deleting the EventMgr when events are still pending should shut +// down gracefully. +TEST(EventMgr, NonEmptyShutdown) { + auto stream_exec = GPUMachineManager()->ExecutorForDevice(0).ValueOrDie(); + EventMgr em(stream_exec); + TEST_EventMgrHelper th(&em); + EXPECT_EQ(0, th.queue_size()); + EXPECT_EQ(0, th.free_size()); + std::vector<Tensor>* v = nullptr; + std::unique_ptr<gpu::Stream> stream(new gpu::Stream(stream_exec)); + CHECK(stream.get()); + stream->Init(); + for (int i = 0; i < 5; ++i) { + v = new std::vector<Tensor>; + th.QueueTensors(stream.get(), v); + EXPECT_EQ(1 + i, th.queue_size()); + EXPECT_EQ(0, th.free_size()); + } +} + +} // namespace +} // namespace tensorflow + +#endif // GOOGLE_CUDA diff --git a/tensorflow/core/common_runtime/gpu/gpu_init.cc b/tensorflow/core/common_runtime/gpu/gpu_init.cc new file mode 100644 index 0000000000..631a47eb91 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_init.cc @@ -0,0 +1,147 @@ +#include "tensorflow/core/common_runtime/gpu/gpu_init.h" + +#include <string> + +#include "tensorflow/core/platform/port.h" +#include "tensorflow/core/platform/logging.h" +#include "tensorflow/stream_executor/multi_platform_manager.h" +#include "tensorflow/stream_executor/stream_executor.h" +#include "tensorflow/core/lib/core/errors.h" +#include "tensorflow/core/lib/strings/numbers.h" +#include "tensorflow/core/lib/strings/strcat.h" + +namespace gpu = ::perftools::gputools; + +namespace tensorflow { + +namespace { + +std::unique_ptr<std::map<std::pair<int, int>, bool>> GetPeerAccessMap( + gpu::Platform* platform, int device_count) { + auto* map = new std::map<std::pair<int, int>, bool>; + for (int i = 0; i < device_count; ++i) { + for (int j = 0; j < device_count; ++j) { + gpu::StreamExecutor* from = platform->ExecutorForDevice(i).ValueOrDie(); + gpu::StreamExecutor* to = platform->ExecutorForDevice(j).ValueOrDie(); + (*map)[{i, j}] = from->CanEnablePeerAccessTo(to); + } + } + + return std::unique_ptr<std::map<std::pair<int, int>, bool>>{map}; +} + +Status EnablePeerAccess(gpu::Platform* platform, int device_count) { + for (int i = 0; i < device_count; ++i) { + for (int j = 0; j < device_count; ++j) { + gpu::StreamExecutor* from = platform->ExecutorForDevice(i).ValueOrDie(); + gpu::StreamExecutor* to = platform->ExecutorForDevice(j).ValueOrDie(); + + if (from->CanEnablePeerAccessTo(to)) { + auto status = from->EnablePeerAccessTo(to); + if (!status.ok()) { + return errors::Internal(status.ToString()); + } + } else { + LOG(INFO) << "cannot enable peer access from device ordinal " << i + << " to device ordinal " << j; + } + } + } + return Status::OK(); +} + +static void InitGPU() { + auto result = gpu::MultiPlatformManager::PlatformWithName("CUDA"); + if (!result.ok()) { + LOG(WARNING) + << "Not initializing the GPU, could not create GPU MachineManager. " + << "Error: " << result.status(); + return; + } + + gpu::Platform* platform = result.ValueOrDie(); + + int dev_count = platform->VisibleDeviceCount(); + + if (dev_count == 0) { + LOG(INFO) << "No GPU devices available on machine."; + return; + } + + for (int i = 0; i < dev_count; ++i) { + auto stream_exec = platform->ExecutorForDevice(i).ValueOrDie(); + int64 free_bytes; + int64 total_bytes; + if (!stream_exec->DeviceMemoryUsage(&free_bytes, &total_bytes)) { + // Logs internally on failure. + free_bytes = 0; + total_bytes = 0; + } + const auto& description = stream_exec->GetDeviceDescription(); + int cc_major; + int cc_minor; + if (!description.cuda_compute_capability(&cc_major, &cc_minor)) { + // Logs internally on failure. + cc_major = 0; + cc_minor = 0; + } + LOG(INFO) << "Found device " << i << " with properties: " + << "\nname: " << description.name() << "\nmajor: " << cc_major + << " minor: " << cc_minor << " memoryClockRate (GHz) " + << description.clock_rate_ghz() << "\npciBusID " + << description.pci_bus_id() << "\nTotal memory: " + << strings::HumanReadableNumBytes(total_bytes) + << "\nFree memory: " + << strings::HumanReadableNumBytes(free_bytes); + } + + // Enable peer access + + auto status = EnablePeerAccess(platform, dev_count); + if (!status.ok()) { + LOG(FATAL) << "could not enable peer access for GPU devices: " << status; + } + + // Print out a matrix showing which devices can DMA to one + // another. + auto access_map = GetPeerAccessMap(platform, dev_count); + string line_buf = "DMA: "; + for (int i = 0; i < dev_count; ++i) { + strings::StrAppend(&line_buf, i, " "); + } + LOG(INFO) << line_buf; + for (int i = 0; i < dev_count; ++i) { + line_buf = strings::StrCat(i, ": "); + for (int j = 0; j < dev_count; ++j) { + if ((*access_map)[{i, j}]) { + line_buf.append("Y "); + } else { + line_buf.append("N "); + } + } + LOG(INFO) << line_buf; + } +} + +static bool InitModule() { + InitGPU(); + return true; +} + +} // namespace + +gpu::Platform* GPUMachineManager() { + // Create the machine manager singleton and initialize the GPUs only + // once. + static bool init = InitModule(); + CHECK(init); // Avoids compiler warning that init is unused. + + auto result = gpu::MultiPlatformManager::PlatformWithName("CUDA"); + if (!result.ok()) { + return nullptr; + } + + return result.ValueOrDie(); +} + +} // namespace tensorflow diff --git a/tensorflow/core/common_runtime/gpu/gpu_init.h b/tensorflow/core/common_runtime/gpu/gpu_init.h new file mode 100644 index 0000000000..d126a8b1ca --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_init.h @@ -0,0 +1,19 @@ +#ifndef TENSORFLOW_COMMON_RUNTIME_GPU_GPU_INIT_H_ +#define TENSORFLOW_COMMON_RUNTIME_GPU_GPU_INIT_H_ + +namespace perftools { +namespace gputools { +class Platform; +} // namespace gputools +} // namespace perftools + +namespace tensorflow { + +// Returns the GPU machine manager singleton, creating it and +// initializing the GPUs on the machine if needed the first time it is +// called. +perftools::gputools::Platform* GPUMachineManager(); + +} // namespace tensorflow + +#endif // TENSORFLOW_COMMON_RUNTIME_GPU_GPU_INIT_H_ diff --git a/tensorflow/core/common_runtime/gpu/gpu_region_allocator.cc b/tensorflow/core/common_runtime/gpu/gpu_region_allocator.cc new file mode 100644 index 0000000000..08ff55e221 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_region_allocator.cc @@ -0,0 +1,371 @@ +#include "tensorflow/core/common_runtime/gpu/gpu_region_allocator.h" + +//#include "base/commandlineflags.h" +#include "tensorflow/stream_executor/multi_platform_manager.h" +#include "tensorflow/core/common_runtime/gpu/gpu_allocator_retry.h" +#include "tensorflow/core/common_runtime/gpu/gpu_init.h" +#include "tensorflow/core/lib/core/bits.h" +#include "tensorflow/core/lib/gtl/stl_util.h" +#include "tensorflow/core/lib/strings/numbers.h" +#include "tensorflow/core/lib/strings/str_util.h" +#include "tensorflow/core/lib/strings/strcat.h" +#include "tensorflow/core/platform/logging.h" +#include "tensorflow/core/platform/port.h" + +#if defined(PLATFORM_GOOGLE) +DEFINE_bool(brain_gpu_region_allocator_heap_check_on_destruction, true, + "If true, the CUDA gpu manager checks that all allocated " + "memory through the GPU memory pool implementation has been " + "freed."); + +DEFINE_int64(brain_gpu_region_allocator_region_size, 0, + "If > 0, sets the default chunk-size allocatable from GPU memory. " + "Else defaults to entire GPU memory."); + +#else +bool FLAGS_brain_gpu_region_allocator_heap_check_on_destruction = true; +tensorflow::int64 FLAGS_brain_gpu_region_allocator_region_size = 0; +#endif + +namespace gpu = ::perftools::gputools; + +namespace tensorflow { + +GPURegionAllocator::GPURegionAllocator(int device_id, size_t total_bytes) + : device_id_(device_id), total_bytes_(total_bytes) { + // Get a pointer to the stream_executor for this device + stream_exec_ = GPUMachineManager()->ExecutorForDevice(device_id).ValueOrDie(); + + // Set the region size based on explicit user request, or based on + // total GPU capacity. + if (FLAGS_brain_gpu_region_allocator_region_size > 0) { + region_size_ = FLAGS_brain_gpu_region_allocator_region_size; + } else { + region_size_ = static_cast<size_t>(total_bytes_); + } + + LOG(INFO) << "Setting region size to " << region_size_; +} + +GPURegionAllocator::~GPURegionAllocator() { + if (FLAGS_brain_gpu_region_allocator_heap_check_on_destruction) { + CheckForMemoryLeaks(); + } + + gtl::STLDeleteValues(&chunk_map_); + + for (auto r : regions_) { + gpu::DeviceMemoryBase gpu_ptr{r->ptr}; + stream_exec_->Deallocate(&gpu_ptr); + delete r; + } +} + +void* GPURegionAllocator::AllocateRaw(size_t alignment, size_t num_bytes) { + static const int64 kMaxMillisToWait = 10000; // 10 seconds + return retry_helper_.AllocateRaw( + [this](size_t a, size_t nb, bool v) { + return AllocateRawInternal(a, nb, v); + }, + kMaxMillisToWait, alignment, num_bytes); +} + +void* GPURegionAllocator::AllocateRawInternal(size_t alignment, + size_t num_bytes, + bool dump_log_on_failure) { + if (num_bytes == 0) { + LOG(ERROR) << "tried to allocate 0 bytes"; + return nullptr; + } + size_t chunk_size = ChunkSize(num_bytes); + + VLOG(2) << "chunk_size " << chunk_size << " from num_bytes " + << strings::HumanReadableNumBytes(num_bytes); + mutex_lock l(lock_); + Pool* pool = &pools_[chunk_size]; + if (pool->num_free == 0) { + if (!ExpandPool(pool, chunk_size, num_bytes, dump_log_on_failure)) { + if (dump_log_on_failure) { + LOG(WARNING) << "Out of GPU memory, see memory state dump above"; + } + return nullptr; + } + } + CHECK_LT(0, pool->num_free); + CHECK(pool->first); + CHECK(pool->last); + Chunk* c = pool->first; + CHECK(c); + CHECK(!c->in_use); + + c->in_use = true; + // Move c to the back of the queue. + if (c->next != nullptr) { + pool->first = c->next; + pool->first->prev = nullptr; + c->next = nullptr; + } + + if (pool->last != c) { + pool->last->next = c; + c->prev = pool->last; + pool->last = c; + } + pool->num_free--; + pool->cumulative_malloced++; + + void* rv = c->ptr; + c->bytes_allocated = num_bytes; + + VLOG(2) << "new ptr " << rv; + return rv; +} + +void GPURegionAllocator::DeallocateRaw(void* ptr) { + retry_helper_.DeallocateRaw([this](void* p) { DeallocateRawInternal(p); }, + ptr); +} + +void GPURegionAllocator::DeallocateRawInternal(void* ptr) { + VLOG(2) << "DeallocateRaw: " << ptr; + if (ptr == nullptr) { + LOG(ERROR) << "tried to deallocate nullptr"; + return; + } + + mutex_lock l(lock_); + ChunkMap::const_iterator iter = chunk_map_.find(ptr); + CHECK(iter != chunk_map_.end()); + + Chunk* c = iter->second; + VLOG(2) << "chunk of size " << c->size << " at " << c; + + Pool* pool = &(pools_[c->size]); + // Move chunk to head of queue, and mark free. + DCHECK(c->in_use); + c->in_use = false; + if (c->prev) c->prev->next = c->next; + if (c->next) c->next->prev = c->prev; + if (pool->first == c) pool->first = c->next; + if (pool->last == c) pool->last = c->prev; + c->next = pool->first; + c->prev = nullptr; + if (c->next) c->next->prev = c; + pool->first = c; + if (pool->last == nullptr) pool->last = c; + pool->num_free++; + pool->cumulative_freed++; +} + +bool GPURegionAllocator::ExpandPool(Pool* pool, size_t chunk_size, + size_t requested_size, + bool dump_log_on_failure) { + VLOG(1) << "ExpandPool of " << chunk_size << " from " << pool->num_chunks + << " current members"; + DCHECK_NE(0, chunk_size); + // If chunk_size is < 4096, double the pool size. Otherwise + // just increase by one. + int num_chunks = pool->num_chunks; + if (num_chunks == 0) { + if (chunk_size > 4096) { + num_chunks = 1; + } else { + num_chunks = 4096 / chunk_size; + } + } + // For larger chunks, limit the amount of expansion. + size_t aggregate_size = num_chunks * chunk_size; + if (aggregate_size > (1 << 20)) { + num_chunks = static_cast<int>( + std::max(static_cast<size_t>(1), (1 << 20) / chunk_size)); + } + while (num_chunks > 0) { + Region* r = (regions_.empty() ? nullptr : regions_.back()); + if (r == nullptr || + (((r->ptr + r->size) - r->next) < static_cast<int64>(chunk_size))) { + // Current region is not large enough to accommodate another chunk. + while (r == nullptr || (((r->ptr + r->size) - r->next) < + static_cast<int64>(chunk_size))) { + // Get another region. + size_t this_region_size = std::max(region_size_, chunk_size); + + // Check if we would exceed our limit. + if (allocated_memory_ + this_region_size > total_bytes_) { + if (dump_log_on_failure) DumpMemoryLog(); + return false; + } + + // Perform the allocation, still checking that the allocator + // has not run out of memory. + gpu::DeviceMemory<char> gpu_mem = + stream_exec_->AllocateArray<char>(this_region_size); + if (gpu_mem == nullptr) { + if (dump_log_on_failure) DumpMemoryLog(); + return false; + } + + // We never release memory once expanded. + allocated_memory_ += this_region_size; + + Region* nr = new Region; + nr->ptr = static_cast<char*>(gpu_mem.opaque()); + + if (VLOG_IS_ON(2)) { + int64 free_bytes; + int64 total_bytes; + if (stream_exec_->DeviceMemoryUsage(&free_bytes, &total_bytes)) { + VLOG(2) << "free " << free_bytes << " total " << total_bytes; + } else { + // Note: stream_exec call also logs internally on failure. + VLOG(2) << "could not retrieve memory usage"; + } + } + VLOG(1) << "new Region of size " << this_region_size << " at " + << static_cast<void*>(nr->ptr) << " on device " << device_id_; + r = nr; + r->size = this_region_size; + r->next = r->ptr; + regions_.push_back(r); + + for (auto visitor : region_visitors_) { + visitor(r->ptr, r->size); + } + } + } else { + // Allocate a new chunk and push on front of Pool. + Chunk* c = new Chunk; + c->ptr = r->next; + chunk_map_[c->ptr] = c; + c->size = chunk_size; + r->next += chunk_size; + c->next = pool->first; + if (c->next != nullptr) c->next->prev = c; + pool->first = c; + if (pool->last == nullptr) pool->last = c; + pool->num_chunks++; + pool->num_free++; + --num_chunks; + } + } + + return true; +} + +void GPURegionAllocator::CheckForMemoryLeaks() { + std::vector<string> errors; + mutex_lock l(lock_); // could use reader lock + for (auto pool_map : pools_) { + const Pool& p = pool_map.second; + Chunk* curr_chunk = p.first; + while (curr_chunk != nullptr) { + if (curr_chunk->in_use) { + errors.push_back( + strings::StrCat("Unfreed chunk of size ", curr_chunk->size)); + } + curr_chunk = curr_chunk->next; + } + } + if (!errors.empty()) { + LOG(FATAL) << "GPU Memory leaks:\n" << str_util::Join(errors, "\n"); + } +} + +// Since there's no merging of chunks once allocated, we want to +// maximize their reusablity (which argues for fewer, larger sizes), +// while minimizing waste (which argues for tight-fitting sizes). +// +// The smallest unit of allocation is 256 bytes. +// NOTE(tucker): akrizhevsky says that nvidia's memory manager always +// aligns to 256 bytes, and doing so results in significant speedup. +// +// Up to 2^16 bytes we only allocate in powers of 2. +// +// Above that, we pick a max-waste which is the largest power +// of 2 <= 1/16 of the requested size, then round up to the nearest +// multiple of max_waste. +// +// static +size_t GPURegionAllocator::ChunkSize(size_t bytes) { + if (bytes <= 256) { + return 256; + } else if (bytes <= (1 << 16)) { + return 1uLL << Log2Ceiling64(bytes); + } else { + // 1/16th of requested size + size_t max_waste = 1uLL << (Log2Ceiling64(bytes) - 4); + return (bytes + max_waste) & (~(max_waste - 1)); + } +} + +void GPURegionAllocator::AddAllocVisitor(Visitor visitor) { + VLOG(1) << "AddVisitor"; + mutex_lock l(lock_); + region_visitors_.push_back(visitor); + for (auto region : regions_) { + visitor(region->ptr, region->size); + } +} + +void GPURegionAllocator::DumpMemoryLog() { + size_t region_bytes = 0; + for (auto r : regions_) { + region_bytes += r->size; + } + size_t chunk_bytes = 0; + std::vector<size_t> chunk_sizes; + for (auto i : pools_) { + chunk_sizes.push_back(i.first); + } + std::sort(chunk_sizes.begin(), chunk_sizes.end()); + for (auto i : chunk_sizes) { + int32 chunks_in_use = 0; + const Pool& p = pools_[i]; + chunk_bytes += i * p.num_chunks; + + if (p.num_chunks > 0) { + // Iterate backwards (allocated chunks are last). + Chunk* curr_chunk = p.last; + while (curr_chunk != nullptr) { + if (curr_chunk->in_use) { + ++chunks_in_use; + } + curr_chunk = curr_chunk->prev; + if (curr_chunk == p.first) { + break; + } + } + } + + LOG(INFO) << "Chunk size: " << i << " (" + << strings::HumanReadableNumBytes(i) << ") Pool: " << p.ToString() + << "\nNumber of chunks: " << p.num_chunks + << ", in_use chunks: " << chunks_in_use; + } + + LOG(INFO) << "Aggregate Region Memory: " << region_bytes << " (" + << strings::HumanReadableNumBytes(region_bytes) << ")"; + LOG(INFO) << "Aggregate Chunk Memory: " << chunk_bytes << " (" + << strings::HumanReadableNumBytes(chunk_bytes) << ")"; +} + +bool GPURegionAllocator::TracksAllocationSizes() { return true; } + +size_t GPURegionAllocator::RequestedSize(void* ptr) { + mutex_lock l(lock_); + auto it = chunk_map_.find(ptr); + CHECK(it != chunk_map_.end()) + << "Asked for requested size of pointer we never allocated: " << ptr; + auto c = it->second; + return c->bytes_allocated; +} + +size_t GPURegionAllocator::AllocatedSize(void* ptr) { + mutex_lock l(lock_); + auto it = chunk_map_.find(ptr); + CHECK(it != chunk_map_.end()) + << "Asked for allocated size of pointer we never allocated: " << ptr; + auto c = it->second; + return c->size; +} + +} // namespace tensorflow diff --git a/tensorflow/core/common_runtime/gpu/gpu_region_allocator.h b/tensorflow/core/common_runtime/gpu/gpu_region_allocator.h new file mode 100644 index 0000000000..1a250b6ede --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_region_allocator.h @@ -0,0 +1,146 @@ +#ifndef TENSORFLOW_COMMON_RUNTIME_GPU_GPU_REGION_ALLOCATOR_H_ +#define TENSORFLOW_COMMON_RUNTIME_GPU_GPU_REGION_ALLOCATOR_H_ + +#include <memory> +#include <string> +#include <unordered_map> +#include <vector> + +#include "tensorflow/stream_executor/stream_executor.h" +#include "tensorflow/core/common_runtime/gpu/gpu_allocator_retry.h" +#include "tensorflow/core/common_runtime/gpu/visitable_allocator.h" +#include "tensorflow/core/lib/strings/strcat.h" +#include "tensorflow/core/platform/port.h" +#include "tensorflow/core/platform/thread_annotations.h" + +namespace tensorflow { + +class GPURegionAllocator : public VisitableAllocator { + public: + // 'device_id' must be a valid device on the machine. + // + // total_bytes is how many bytes this allocator should allocate up + // to. This may be less than the total available. + explicit GPURegionAllocator(int device_id, size_t total_bytes); + ~GPURegionAllocator() override; + + string Name() override { return "gpu_region"; } + void* AllocateRaw(size_t alignment, size_t num_bytes) override; + void DeallocateRaw(void* ptr) override; + void AddAllocVisitor(Visitor visitor) override; + // Does nothing, because regions are never freed. + void AddFreeVisitor(Visitor visitor) override {} + + bool TracksAllocationSizes() override; + size_t RequestedSize(void* ptr) override; + size_t AllocatedSize(void* ptr) override; + + private: + // A Chunk is the header on a single piece of memory given back + // in response to an AllocateRaw() call. + struct Chunk { + char* ptr; // pointer to granted GPU buffer. + size_t size; // Full size of GPU buffer. + size_t bytes_allocated; // Bytes asked for by client. + bool in_use; + Chunk* prev; // Used for chaining in pool. + Chunk* next; + Chunk() + : ptr(nullptr), + size(0), + bytes_allocated(0), + in_use(false), + prev(nullptr), + next(nullptr) {} + }; + + // A Pool is a collection of same-sized Chunks. + struct Pool { + int num_chunks; // total chunks in this pool + int num_free; // total free chunks in this pool + int64 cumulative_malloced; // number of chunks malloced so far + int64 cumulative_freed; // number of chunks freed so far + + // double-linked ring of chunks; all free chunks precede all + // granted chunks + Chunk* first; + Chunk* last; + Pool() + : num_chunks(0), + num_free(0), + cumulative_malloced(0), + cumulative_freed(0), + first(nullptr), + last(nullptr) {} + + string ToString() const { + return strings::StrCat("chunks: ", num_chunks, " free: ", num_free, + " cumulative malloc: ", cumulative_malloced, + " cumulative freed: ", cumulative_freed); + } + }; + + // A Region is a single area of GPU memory that has been + // reserved by this class and carved up into Chunks. + struct Region { + char* ptr; // base GPU ptr + char* next; // frontier of unused part of region + size_t size; + Region() : ptr(nullptr), size(0) {} + }; + + // Calculate size of chunk for an allocation of this size. + // Min chunk size is 16, for alignment. + // For larger sizes, we round up somewhat so there are fewer + // size-specific pools. + static size_t ChunkSize(size_t bytes); + + void* AllocateRawInternal(size_t alignment, size_t num_bytes, + bool dump_log_on_failure); + void DeallocateRawInternal(void* ptr); + + bool ExpandPool(Pool* p, size_t chunk_size, size_t requested_size, + bool dump_log_on_failure) EXCLUSIVE_LOCKS_REQUIRED(lock_); + + // Inspects region maps and crashes with debug information if there + // are any memory leaks as detected by the region allocator. + void CheckForMemoryLeaks() LOCKS_EXCLUDED(lock_); + + void DumpMemoryLog() EXCLUSIVE_LOCKS_REQUIRED(lock_); + + perftools::gputools::StreamExecutor* stream_exec_; // Not owned. + + typedef std::unordered_map<size_t, Pool> PoolMap; + typedef std::unordered_map<void*, Chunk*> ChunkMap; + + GPUAllocatorRetry retry_helper_; + mutable mutex lock_; + PoolMap pools_ GUARDED_BY(lock_); + + // Owns regions. + std::vector<Region*> regions_ GUARDED_BY(lock_); + + // Maps from GPU ptr to Chunk owning it. + // + // Owns chunks. + ChunkMap chunk_map_ GUARDED_BY(lock_); + + // Called once on each region, ASAP. + std::vector<Visitor> region_visitors_ GUARDED_BY(lock_); + + const int device_id_; + + // Total amount of memory (in bytes) available to this Allocator + const size_t total_bytes_; + + // Total amount of memory allocated to regions. + size_t allocated_memory_ = 0; + + size_t region_size_ = 0; + + TF_DISALLOW_COPY_AND_ASSIGN(GPURegionAllocator); +}; + +} // namespace tensorflow + +#endif // TENSORFLOW_COMMON_RUNTIME_GPU_GPU_REGION_ALLOCATOR_H_ diff --git a/tensorflow/core/common_runtime/gpu/gpu_region_allocator_test.cc b/tensorflow/core/common_runtime/gpu/gpu_region_allocator_test.cc new file mode 100644 index 0000000000..07b0dd57f6 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_region_allocator_test.cc @@ -0,0 +1,71 @@ +#if GOOGLE_CUDA + +#include "tensorflow/core/common_runtime/gpu/gpu_region_allocator.h" + +#include <algorithm> +#include <vector> + +#include "tensorflow/core/platform/port.h" +#include "tensorflow/core/lib/gtl/inlined_vector.h" +#include "tensorflow/core/platform/logging.h" +#include "tensorflow/core/common_runtime/gpu/gpu_init.h" +#include "tensorflow/stream_executor/stream_executor.h" +#include <gtest/gtest.h> + +namespace gpu = ::perftools::gputools; + +namespace tensorflow { +namespace { + +TEST(GPURegionAllocatorTest, Simple) { + GPURegionAllocator a(0, 1 << 26); + std::vector<void*> ptrs; + for (int s = 1; s < 1024; s++) { + void* raw = a.AllocateRaw(1, s); + ptrs.push_back(raw); + } + std::sort(ptrs.begin(), ptrs.end()); + for (int i = 0; i < ptrs.size(); i++) { + if (i > 0) { + CHECK_NE(ptrs[i], ptrs[i - 1]); // No dups + } + a.DeallocateRaw(ptrs[i]); + } + float* t1 = a.Allocate<float>(1024); + double* t2 = a.Allocate<double>(1048576); + a.Deallocate(t1); + a.Deallocate(t2); +} + +TEST(GPURegionAllocatorTest, CheckMemLeak) { + EXPECT_DEATH( + { + GPURegionAllocator a(0, 1 << 26); + float* t1 = a.Allocate<float>(1024); + if (t1) { + LOG(INFO) << "Not deallocating"; + } + }, + ""); +} + +TEST(GPURegionAllocatorTest, TracksSizes) { + GPURegionAllocator a(0, 1 << 26); + EXPECT_EQ(true, a.TracksAllocationSizes()); +} + +TEST(GPURegionAllocatorTest, AllocatedVsRequested) { + GPURegionAllocator a(0, 1 << 26); + float* t1 = a.Allocate<float>(1); + EXPECT_EQ(sizeof(float), a.RequestedSize(t1)); + + // Minimum allocation size if 256 + EXPECT_EQ(256, a.AllocatedSize(t1)); + + a.Deallocate(t1); +} + +} // namespace +} // namespace tensorflow + +#endif // GOOGLE_CUDA diff --git a/tensorflow/core/common_runtime/gpu/gpu_stream_util.cc b/tensorflow/core/common_runtime/gpu/gpu_stream_util.cc new file mode 100644 index 0000000000..ca86c7fa06 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_stream_util.cc @@ -0,0 +1,97 @@ +#include "tensorflow/core/common_runtime/gpu/gpu_stream_util.h" + +#include <set> +#include <string> +#include <unordered_set> +#include <vector> + +#include "tensorflow/core/graph/algorithm.h" +#include "tensorflow/core/lib/core/errors.h" +#include "tensorflow/core/lib/strings/strcat.h" + +namespace tensorflow { +namespace gpu_stream_util { + +Status AssignStreams(const Graph* graph, const AssignStreamsOpts& opts, + std::unordered_map<int, int>* node_to_stream_id) { + VLOG(1) << "AssignStreams"; + Status status; + + // Sanity check arguments. + if (graph == nullptr) + status.Update(errors::InvalidArgument("Bad graph argument supplied.")); + if (node_to_stream_id == nullptr) { + status.Update( + errors::InvalidArgument("Bad node_to_stream_id argument supplied.")); + } + if ((opts.max_streams < 1) || (opts.send_stream >= opts.max_streams) || + (opts.recv_stream >= opts.max_streams) || + (opts.const_stream >= opts.max_streams) || + (opts.compute_stream >= opts.max_streams)) { + status.Update(errors::InvalidArgument("Bad graph argument supplied.")); + } + TF_RETURN_IF_ERROR(status); + + // Topologically sort the nodes. + std::vector<Node*> order; + GetReversePostOrder(*graph, &order); + if (VLOG_IS_ON(2)) { + for (Node* n : order) { + const int node_id = n->id(); + VLOG(2) << "Node " << node_id << " " << n->type_string() << " " + << n->name() << " " << n->in_edges().size() << " inputs"; + for (const Edge* e : n->in_edges()) { + VLOG(2) << " Edge from " << e->src()->id() << " " << e->src()->name() + << " fanout " << e->src()->out_edges().size(); + } + } + } + // We perform stream assigmnent assuming a large number of + // stream IDs and then map these down to the required number of streams + // using simple round-robin. + // Stream Assignment strategy: + // 1. Nodes with zero inputs are always be executed on a + // fresh stream. + // 2. Try to execute a node on the same stream as one of its + // inputs to avoid inter-stream dependencies. + // 3. If any input comes from a node with a large fanout then + // perhaps an indication that it is shared between parallel + // streams of work. We choose a new stream here so that all consumers + // of the tensor are likely to run in parallel. + int highest_stream_id = -1; + for (Node* n : order) { + VLOG(3) << "Inspecting node " << n->DebugString(); + const int node_id = n->id(); + const string& op = n->type_string(); + + // Determine a suitable stream to use. + int stream_id = highest_stream_id + 1; + for (const Edge* e : n->in_edges()) { + const int fanout = e->src()->out_edges().size(); + if (fanout == 1) { + stream_id = (*node_to_stream_id)[e->src()->id()]; + break; + } + } + // Override stream for specific op types. + if (op == "_Send") { + if (opts.send_stream >= 0) stream_id = opts.send_stream; + } else if (op == "_Recv") { + if (opts.recv_stream >= 0) stream_id = opts.recv_stream; + } else if (op == "Const") { + if (opts.const_stream >= 0) stream_id = opts.const_stream; + } else { + if (opts.compute_stream >= 0) stream_id = opts.compute_stream; + } + + (*node_to_stream_id)[node_id] = stream_id % opts.max_streams; + highest_stream_id = std::max(stream_id, highest_stream_id); + } + VLOG(1) << "Identified " << highest_stream_id << " candidate streams for " + << order.size() << " nodes."; + + return Status::OK(); +} + +} // namespace gpu_stream_util +} // namespace tensorflow diff --git a/tensorflow/core/common_runtime/gpu/gpu_stream_util.h b/tensorflow/core/common_runtime/gpu/gpu_stream_util.h new file mode 100644 index 0000000000..e1c623382c --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_stream_util.h @@ -0,0 +1,30 @@ +#ifndef TENSORFLOW_COMMON_RUNTIME_GPU_GPU_STREAM_UTIL_H_ +#define TENSORFLOW_COMMON_RUNTIME_GPU_GPU_STREAM_UTIL_H_ + +#include <unordered_map> + +#include "tensorflow/core/graph/graph.h" +#include "tensorflow/core/public/status.h" + +namespace tensorflow { +namespace gpu_stream_util { + +struct AssignStreamsOpts { + int32 max_streams = 1; + // The following options specify a stream to use for specific op + // types. The value -1 allows ops to be assigned to any stream. + int32 send_stream = -1; + int32 recv_stream = -1; + int32 const_stream = -1; + int32 compute_stream = -1; +}; + +// Given the input graph, assigns every node in the graph with a +// stream_id that should be used. +Status AssignStreams(const Graph* graph, const AssignStreamsOpts& opts, + std::unordered_map<int, int>* node_to_stream_id); + +} // namespace gpu_stream_util +} // namespace tensorflow + +#endif // TENSORFLOW_COMMON_RUNTIME_GPU_GPU_STREAM_UTIL_H_ diff --git a/tensorflow/core/common_runtime/gpu/gpu_stream_util_test.cc b/tensorflow/core/common_runtime/gpu/gpu_stream_util_test.cc new file mode 100644 index 0000000000..5c426caaef --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_stream_util_test.cc @@ -0,0 +1,137 @@ +#include "tensorflow/core/common_runtime/gpu/gpu_stream_util.h" + +#include <gtest/gtest.h> +#include "tensorflow/cc/ops/array_ops.h" +#include "tensorflow/cc/ops/sendrecv_ops.h" +#include "tensorflow/cc/ops/standard_ops.h" +#include "tensorflow/core/framework/op.h" +#include "tensorflow/core/framework/types.pb.h" +#include "tensorflow/core/graph/graph_def_builder.h" +#include "tensorflow/core/graph/node_builder.h" +#include "tensorflow/core/kernels/ops_testutil.h" +#include "tensorflow/core/kernels/ops_util.h" +#include "tensorflow/core/lib/core/status_test_util.h" + +namespace tensorflow { +namespace { + +class GpuStreamUtilTest : public OpsTestBase { + protected: + void SetUp() override { RequireDefaultOps(); } +}; + +TEST_F(GpuStreamUtilTest, BogusOpts) { + GraphDefBuilder b(GraphDefBuilder::kFailImmediately); + Graph g(OpRegistry::Global()); + ASSERT_OK(b.ToGraph(&g)); + std::unordered_map<int, int> node_to_stream_id; + gpu_stream_util::AssignStreamsOpts opts; + Status status; + status = gpu_stream_util::AssignStreams(nullptr, opts, &node_to_stream_id); + EXPECT_FALSE(status.ok()); + status = gpu_stream_util::AssignStreams(&g, opts, nullptr); + EXPECT_FALSE(status.ok()); + opts.max_streams = 0; + status = gpu_stream_util::AssignStreams(&g, opts, &node_to_stream_id); + EXPECT_FALSE(status.ok()); + opts.max_streams = 1; + opts.compute_stream = 5; + status = gpu_stream_util::AssignStreams(&g, opts, &node_to_stream_id); + EXPECT_FALSE(status.ok()); +} + +TEST_F(GpuStreamUtilTest, EmptyGraph) { + GraphDefBuilder b(GraphDefBuilder::kFailImmediately); + Graph g(OpRegistry::Global()); + ASSERT_OK(b.ToGraph(&g)); + std::unordered_map<int, int> node_to_stream_id; + gpu_stream_util::AssignStreamsOpts opts; + ASSERT_OK(gpu_stream_util::AssignStreams(&g, opts, &node_to_stream_id)); + EXPECT_EQ(2, node_to_stream_id.size()); // _SOURCE and _SINK +} + +TEST_F(GpuStreamUtilTest, SimpleGraphOneStream) { + GraphDefBuilder b(GraphDefBuilder::kFailImmediately); + ops::MatMul(ops::Const(Tensor(DT_FLOAT), b.opts()), + ops::Const(Tensor(DT_FLOAT), b.opts()), b.opts()); + Graph g(OpRegistry::Global()); + ASSERT_OK(b.ToGraph(&g)); + + std::unordered_map<int, int> node_to_stream_id; + gpu_stream_util::AssignStreamsOpts opts; + ASSERT_OK(gpu_stream_util::AssignStreams(&g, opts, &node_to_stream_id)); + + // There should be 5 nodes assigned. + EXPECT_EQ(5, node_to_stream_id.size()); + + // All of them should have stream 0. + for (const auto& it : node_to_stream_id) { + EXPECT_EQ(0, it.second); + } +} + +TEST_F(GpuStreamUtilTest, SimpleGraphManyStreams) { + GraphDefBuilder b(GraphDefBuilder::kFailImmediately); + ops::MatMul(ops::Const(Tensor(DT_FLOAT), b.opts()), + ops::Const(Tensor(DT_FLOAT), b.opts()), b.opts()); + Graph g(OpRegistry::Global()); + ASSERT_OK(b.ToGraph(&g)); + + std::unordered_map<int, int> node_to_stream_id; + gpu_stream_util::AssignStreamsOpts opts; + opts.max_streams = 3; + ASSERT_OK(gpu_stream_util::AssignStreams(&g, opts, &node_to_stream_id)); + + // There should be 5 nodes assigned. + EXPECT_EQ(5, node_to_stream_id.size()); + + // All of them should have a stream in the range [0..max_streams). + for (const auto& it : node_to_stream_id) { + EXPECT_GE(it.second, 0); + EXPECT_LT(it.second, opts.max_streams); + } +} + +TEST_F(GpuStreamUtilTest, StreamOverrides) { + GraphDefBuilder b(GraphDefBuilder::kFailImmediately); + ops::_Recv(DT_FLOAT, "input", "/cpu:0", 0, "/gpu:0", + b.opts().WithName("input")); + auto n = ops::MatMul(ops::Const(Tensor(DT_FLOAT), b.opts()), + ops::Const(Tensor(DT_FLOAT), b.opts()), b.opts()); + ops::_Send(n, "output", "/gpu:0", 0, "/cpu:0", b.opts().WithName("output")); + Graph g(OpRegistry::Global()); + ASSERT_OK(b.ToGraph(&g)); + + // Perform stream assignment using a large number of streams, but with + // op types constrained to specific streams. + std::unordered_map<int, int> node_to_stream_id; + gpu_stream_util::AssignStreamsOpts opts; + opts.max_streams = 100; + opts.const_stream = 90; + opts.send_stream = 91; + opts.recv_stream = 92; + opts.compute_stream = 93; + ASSERT_OK(gpu_stream_util::AssignStreams(&g, opts, &node_to_stream_id)); + + // There should be 7 nodes assigned. + EXPECT_EQ(7, node_to_stream_id.size()); // including _SOURCE and _SINK + + // Nodes should be assigned to streams by op type. + for (const auto& it : node_to_stream_id) { + Node* n = g.FindNodeId(it.first); + const string op = n->type_string(); + const int stream = it.second; + if (op == "Const") { + EXPECT_EQ(stream, 90); + } else if (op == "_Send") { + EXPECT_EQ(stream, 91); + } else if (op == "_Recv") { + EXPECT_EQ(stream, 92); + } else { // Compute. + EXPECT_EQ(stream, 93); + } + } +} + +} // namespace +} // namespace tensorflow diff --git a/tensorflow/core/common_runtime/gpu/gpu_util.cc b/tensorflow/core/common_runtime/gpu/gpu_util.cc new file mode 100644 index 0000000000..a6a3ce01fc --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_util.cc @@ -0,0 +1,345 @@ +#include "tensorflow/core/common_runtime/gpu/gpu_util.h" + +//#include "base/commandlineflags.h" +#include "tensorflow/core/common_runtime/device.h" +#include "tensorflow/core/common_runtime/gpu_device_context.h" +#include "tensorflow/core/framework/types.h" +#include "tensorflow/core/lib/core/errors.h" +#include "tensorflow/core/lib/core/refcount.h" +#include "tensorflow/core/lib/gtl/array_slice.h" +#include "tensorflow/core/lib/gtl/stl_util.h" +#include "tensorflow/core/lib/hash/hash.h" +#include "tensorflow/core/lib/strings/strcat.h" +#include "tensorflow/core/lib/strings/stringprintf.h" +#include "tensorflow/core/platform/logging.h" +#include "tensorflow/core/platform/tensor_coding.h" +#include "tensorflow/core/platform/tracing.h" +#include "tensorflow/core/public/tensor.h" +#include "tensorflow/core/common_runtime/gpu/dma_helper.h" +#include "tensorflow/core/common_runtime/gpu/gpu_event_mgr.h" +#include "tensorflow/core/common_runtime/gpu/process_state.h" +#include "tensorflow/core/util/util.h" +#include "tensorflow/stream_executor/stream.h" +#include "tensorflow/stream_executor/stream_executor.h" + +#include "tensorflow/core/platform/stream_executor_util.h" + +#if defined(PLATFORM_GOOGLE) +DEFINE_int64(brain_gpu_util_debug_string_maxlen, 128, + "When dumping gpu memory, prints up to this many bytes."); + +DECLARE_bool(record_mem_types); +#else +tensorflow::int64 FLAGS_brain_gpu_util_debug_string_maxlen = 128; +bool FLAGS_EXPERIMENTAL_brain_gpu_multi_stream = false; +extern bool FLAGS_record_mem_types; +#endif + +using perftools::gputools::DeviceMemoryBase; +using perftools::gputools::DeviceMemory; +using perftools::gputools::Stream; + +namespace tensorflow { + +namespace gpu = ::perftools::gputools; + +/*static*/ +void GPUUtil::SetProtoFromGPU(const Tensor& tensor, Device* dev, + const DeviceContext* device_context, + TensorProto* proto, bool is_dead, + StatusCallback done) { + VLOG(1) << "SetProtoFromGPU device_context " << device_context; + // Tensor values need to be copied from GPU to CPU ram so that + // we can build the protobuf response for a RecvTensor RPC. + // "device context" identifies the stream where the _Send op executed. + CHECK(device_context); + gpu::Stream* stream = + static_cast<const GPUDeviceContext*>(device_context)->stream(); + + if (!DMAHelper::CanUseDMA(&tensor)) { + done(errors::Internal(strings::StrCat( + "GPU copy from non-DMA ", DataTypeString(tensor.dtype()), "tensor"))); + return; + } + proto->set_dtype(tensor.dtype()); + tensor.shape().AsProto(proto->mutable_tensor_shape()); + // Prepare a Cord with the right data buf size, and DMA the + // data over from the GPU buffer. Note that 0-size tensors + // do not have a backing buffer. + const size_t num_bytes = is_dead ? 0 : tensor.TotalBytes(); + if (num_bytes > 0) { + port::Tracing::ScopedAnnotation annotation("SetProtoFromGPU"); + Allocator* alloc = ProcessState::singleton()->GetCUDAHostAllocator(0); + char* mb = alloc->Allocate<char>(num_bytes); + const char* src_ptr = + reinterpret_cast<const char*>(DMAHelper::base(&tensor)); + DeviceMemoryBase gpu_src_ptr(const_cast<char*>(src_ptr), num_bytes); + stream->ThenMemcpy(mb, gpu_src_ptr, num_bytes); + // Use of tensor may outlive stack scope, so keep a ref. + Tensor* tensor_ref = new Tensor(tensor); + dev->tensorflow_gpu_device_info()->event_mgr->ThenExecute( + stream, [stream, done, proto, mb, num_bytes, alloc, tensor_ref]() { + if (!stream->ok()) { + done(errors::Internal("SetProtoFromGPU: GPU Memcpy failed")); + // TODO(pbar) We currently have no way to recover the + // worker from a GPU stream in the error state. Until + // there is a way to reset the CUDA driver, it is + // preferable to crash the process and restart. Tracked + // under b/23717097 + LOG(FATAL) << "SetProtoFromGPU: GPU Memcpy failed"; + return; + } + delete tensor_ref; + port::CopyFromArray(proto->mutable_tensor_content(), mb, num_bytes); + alloc->Deallocate<char>(mb); + done(Status::OK()); + }); + } else { + done(Status::OK()); + } +} + +typedef ProcessState::MemDesc PMD; + +/*static*/ +void GPUUtil::CopyViaDMA(const string& edge_name, + DeviceContext* send_dev_context, + DeviceContext* recv_dev_context, Device* src, + Device* dst, AllocatorAttributes src_alloc_attr, + AllocatorAttributes dst_alloc_attr, + const Tensor* input, Tensor* output, + StatusCallback done) { + port::Tracing::ScopedAnnotation annotation(edge_name); + VLOG(1) << "CopyViaDMA " << edge_name; + size_t total_bytes = input->TotalBytes(); + // Note that 0-size tensors have no backing buffer. + if (total_bytes > 0) { + const void* src_ptr = DMAHelper::base(input); + void* dst_ptr = DMAHelper::base(output); + VLOG(2) << "src_ptr " << src_ptr << " dst_ptr " << dst_ptr; + if (FLAGS_record_mem_types) { + ProcessState::MemDesc smd = ProcessState::singleton()->PtrType(src_ptr); + ProcessState::MemDesc dmd = ProcessState::singleton()->PtrType(dst_ptr); + VLOG(0) << "Src " << smd.DebugString() << " Dst " << dmd.DebugString(); + if (smd.loc == PMD::CPU && dmd.loc == PMD::GPU && (!smd.gpu_registered)) { + LOG(WARNING) << "CPU -> GPU no reg for " << edge_name; + } + if (dmd.loc == PMD::CPU && smd.loc == PMD::GPU && (!dmd.gpu_registered)) { + LOG(WARNING) << "GPU -> CPU no reg for " << edge_name; + } + } + + auto src_device_type = src->attributes().device_type(); + auto dst_device_type = dst->attributes().device_type(); + + bool non_cpu_src = (!src_alloc_attr.on_host() && + src_device_type != DeviceType(DEVICE_CPU).type()); + bool non_cpu_dst = (!dst_alloc_attr.on_host() && + dst_device_type != DeviceType(DEVICE_CPU).type()); + if (non_cpu_src) { + gpu::Stream* stream = send_dev_context->stream(); + if (stream == nullptr) { + done(errors::Internal("Failed to find device stream")); + return; + } + auto* src_dev_info = src->tensorflow_gpu_device_info(); + CHECK(src_dev_info); + + if (non_cpu_dst) { + // Device to device copy + DeviceMemoryBase gpu_dst_ptr(dst_ptr, total_bytes); + stream->ThenMemcpy( + &gpu_dst_ptr, + DeviceMemoryBase{const_cast<void*>(src_ptr), total_bytes}, + total_bytes); + if (dst_device_type == DeviceType(DEVICE_GPU).type()) { + // Use of input may outlive stack scope, so keep a ref. + Tensor* input_ref = new Tensor(*input); + src_dev_info->event_mgr->ThenExecute( + stream, [done, stream, input_ref]() { + delete input_ref; + if (!stream->ok()) { + done(errors::Internal("GPU->GPU Memcpy failed")); + } else { + done(Status::OK()); + } + }); + } + send_dev_context->MaintainLifetimeOnStream(input, stream); + } else { + // Device to host copy. + return send_dev_context->CopyDeviceTensorToCPU(input, edge_name, src, + output, done); + } + } else if (non_cpu_dst) { + // Host to Device copy. + // Note that this is already an async copy. + recv_dev_context->CopyCPUTensorToDevice(input, dst, output, done); + } else { + memcpy(dst_ptr, src_ptr, total_bytes); + done(Status::OK()); + } + } else { + // buffer is empty + done(Status::OK()); + } +} + +void GPUUtil::CopyGPUTensorToCPU(Device* gpu_device, + const DeviceContext* device_context, + const Tensor* gpu_tensor, Tensor* cpu_tensor, + StatusCallback done) { + VLOG(1) << "CopyGPUTensorToCPU"; + size_t total_bytes = gpu_tensor->TotalBytes(); + // Note that 0-size tensors have no backing buffer. + if (total_bytes > 0) { + const void* src_ptr = DMAHelper::base(gpu_tensor); + void* dst_ptr = DMAHelper::base(cpu_tensor); + CHECK(dst_ptr); + auto* stream = gpu_device->tensorflow_gpu_device_info()->stream; + if (device_context) { + stream = static_cast<const GPUDeviceContext*>(device_context)->stream(); + } + stream->ThenMemcpy( + dst_ptr, DeviceMemoryBase{const_cast<void*>(src_ptr), total_bytes}, + total_bytes); + stream->BlockHostUntilDone(); + if (!stream->ok()) { + done(errors::Internal("CopyGPUTensorToCPU: GPU->CPU Memcpy failed")); + return; + } + } + + done(Status::OK()); +} + +/* static */ +void GPUUtil::CopyCPUTensorToGPU(const Tensor* cpu_tensor, + const DeviceContext* device_context, + Device* gpu_device, Tensor* gpu_tensor, + StatusCallback done) { + VLOG(1) << "CopyCPUTensorToGPU"; + CHECK(DeviceType(gpu_device->attributes().device_type()) == + DeviceType(DEVICE_GPU)); + + auto* dev_info = gpu_device->tensorflow_gpu_device_info(); + if (!dev_info) { + done(errors::Internal("Failed to find dest device GPUDeviceInfo")); + return; + } + if (cpu_tensor->TotalBytes() != gpu_tensor->TotalBytes()) { + done(errors::Internal( + strings::StrCat("Can't copy ", cpu_tensor->TotalBytes(), + " bytes of a tensor into another with ", + gpu_tensor->TotalBytes(), " bytes buffer."))); + return; + } + const int64 total_bytes = cpu_tensor->TotalBytes(); + // Note that 0-size tensors have no backing buffer. + if (total_bytes > 0) { + const void* src_ptr = DMAHelper::base(cpu_tensor); + void* dst_ptr = DMAHelper::base(gpu_tensor); + DeviceMemoryBase gpu_dst_ptr(dst_ptr, total_bytes); + + CHECK(device_context); + auto* stream = + static_cast<const GPUDeviceContext*>(device_context)->stream(); + stream->ThenMemcpy(&gpu_dst_ptr, src_ptr, total_bytes); + auto* dev_info = gpu_device->tensorflow_gpu_device_info(); + // Use of cpu_tensor may outlive stack scope, so keep a ref. + Tensor* input_ref = new Tensor(*cpu_tensor); + dev_info->event_mgr->ThenExecute(stream, [stream, done, input_ref]() { + delete input_ref; + if (!stream->ok()) { + done(errors::Internal("CopyCPUTensorToGPU: GPU Memcpy failed")); + } else { + done(Status::OK()); + } + }); + } else { + // empty tensor case + done(Status::OK()); + } +} + +Status GPUUtil::Sync(Device* gpu_device) { + VLOG(1) << "GPUUtil::Sync"; + auto* dev_info = gpu_device->tensorflow_gpu_device_info(); + if (!dev_info) { + return errors::Internal("Failed to find dest device GPUDeviceInfo"); + } + dev_info->stream->BlockHostUntilDone(); + if (!dev_info->stream->ok()) { + LOG(FATAL) << "GPU sync failed"; + } + return Status::OK(); +} + +Status GPUUtil::SyncAll(Device* gpu_device) { + VLOG(1) << "GPUUtil::SyncAll"; + auto* dev_info = gpu_device->tensorflow_gpu_device_info(); + if (!dev_info) { + return errors::Internal("Failed to find dest device GPUDeviceInfo"); + } + if (!dev_info->stream->parent()->SynchronizeAllActivity() || + !dev_info->stream->ok()) { + LOG(FATAL) << "GPU sync failed"; + } + return Status::OK(); +} + +string GPUUtil::MemoryDebugString(const Device* device, Tensor* tensor) { + string ret; + CHECK(tensor); + const int64 num_bytes = std::min<int64>( + FLAGS_brain_gpu_util_debug_string_maxlen, tensor->TotalBytes()); + void* ptr = (num_bytes > 0) ? DMAHelper::base(tensor) : nullptr; + strings::Appendf(&ret, "%p:", ptr); + if (num_bytes > 0) { + auto* dev_info = device->tensorflow_gpu_device_info(); + if (!dev_info) { + strings::StrAppend( + &ret, PrintMemory(reinterpret_cast<const char*>(ptr), num_bytes)); + } else { + string buf; + buf.resize(num_bytes); + DeviceMemoryBase gpu_ptr(ptr, num_bytes); + Status s = dev_info->stream->parent()->SynchronousMemcpyD2H( + gpu_ptr, num_bytes, gtl::string_as_array(&buf)); + strings::StrAppend(&ret, + PrintMemory(gtl::string_as_array(&buf), num_bytes)); + } + } + return ret; +} + +// TODO(pbar) Checksum is called from places without a valid device context. +uint64 GPUUtil::Checksum(Device* gpu_device, + const DeviceContext* device_context, + const Tensor& tensor) { + Tensor copy(tensor.dtype(), tensor.shape()); + Status s; + Notification n; + CopyGPUTensorToCPU(gpu_device, device_context, &tensor, ©, + [&s, &n](Status status) { + s.Update(status); + n.Notify(); + }); + n.WaitForNotification(); + CHECK(s.ok()) << s; + return Checksum(copy); +} + +uint64 GPUUtil::Checksum(const Tensor& tensor) { + const float* fptr = reinterpret_cast<const float*>(DMAHelper::base(&tensor)); + size_t num_bytes = tensor.TotalBytes(); + size_t num_floats = num_bytes / sizeof(float); + for (size_t i = 0; i < num_floats; ++i) { + CHECK(!std::isnan(fptr[i])) << " i " << i; + } + // TODO(tucker): consider using crc32c instead. + return Hash64(reinterpret_cast<const char*>(DMAHelper::base(&tensor)), + tensor.TotalBytes(), 0); +} + +} // namespace tensorflow diff --git a/tensorflow/core/common_runtime/gpu/gpu_util.h b/tensorflow/core/common_runtime/gpu/gpu_util.h new file mode 100644 index 0000000000..1d8c3a054d --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_util.h @@ -0,0 +1,89 @@ +#ifndef TENSORFLOW_COMMON_RUNTIME_GPU_GPU_UTIL_H_ +#define TENSORFLOW_COMMON_RUNTIME_GPU_GPU_UTIL_H_ + +#include "tensorflow/core/common_runtime/device.h" +#include "tensorflow/core/public/tensor.h" +#include "tensorflow/core/public/status.h" +#include "tensorflow/core/common_runtime/gpu/dma_helper.h" +#include "tensorflow/stream_executor/device_memory.h" + +#include "tensorflow/stream_executor/stream.h" + +namespace tensorflow { + +class RecvTensorResponse; +class TensorProto; + +namespace gpu = ::perftools::gputools; + +class GPUUtil { + public: + // "tensor" is GPU-local. "dev" is the hosting GPU. + // "device_context" should be the context of the GPU "_Send" op + // which provides the Tensor. + // Sets all necessasry fields of "proto" by transferring value + // bytes from GPU to CPU RAM. "is_dead" indicates that the + // tensor is dead with an uninit value. + static void SetProtoFromGPU(const Tensor& tensor, Device* dev, + const DeviceContext* device_context, + TensorProto* proto, bool is_dead, + StatusCallback done); + + // Copies "input" to "output" between devices accessible to the + // local process via some DMA-like method. "edge_name" is the name + // of the tensor being copied, for debugging purposes. Depending on + // the type of devices and memory in use, the copy may be performed + // synchronously or asynchronously. 'done' will be invoked only + // after the copy is actually complete. + static void CopyViaDMA(const string& edge_name, + DeviceContext* send_dev_context, + DeviceContext* recv_dev_context, Device* src, + Device* dst, const AllocatorAttributes src_alloc_attr, + const AllocatorAttributes dst_alloc_attr, + const Tensor* input, Tensor* output, + StatusCallback done); + + // Copies the data in 'gpu_tensor' into 'cpu_tensor'. + // 'gpu_tensor''s backing memory must be on 'gpu_device' and + // 'cpu_tensor' must be allocated to be of the same size as + // 'gpu_tensor'. Synchronous: may block. + static void CopyGPUTensorToCPU(Device* gpu_device, + const DeviceContext* device_context, + const Tensor* gpu_tensor, Tensor* cpu_tensor, + StatusCallback done); + + // Blocks until all operations queued on the stream associated with + // "gpu_device" at the time of the call have completed. Returns any + // error pending on the stream at completion. + static Status Sync(Device* gpu_device); + + // Blocks until all operations queued on all streams associated with the + // corresponding GPU device at the time of call have completed. + // Returns any error pending on the stream at completion. + static Status SyncAll(Device* gpu_device); + + // For debugging purpose, given a "device" and a "tensor" allocated + // on the device, return a string printing each byte in the tensor + // (up to a limit). "device" can be either a CPU or a GPU device. + static string MemoryDebugString(const Device* device, Tensor* tensor); + + static perftools::gputools::DeviceMemory<float> AsGPUFloat(const Tensor& t); + + // Computes a checksum over the contents of "tensor", which is allocated + // on "gpu_device". + static uint64 Checksum(Device* gpu_device, + const DeviceContext* device_context, + const Tensor& tensor); + + // Computes a checksum over the contents of "tensor", which is allocated + // in local CPU RAM. + static uint64 Checksum(const Tensor& tensor); + + static void CopyCPUTensorToGPU(const Tensor* cpu_tensor, + const DeviceContext* device_context, + Device* gpu_device, Tensor* gpu_tensor, + StatusCallback done); +}; + +} // namespace tensorflow +#endif // TENSORFLOW_COMMON_RUNTIME_GPU_GPU_UTIL_H_ diff --git a/tensorflow/core/common_runtime/gpu/gpu_util_platform_specific.cc b/tensorflow/core/common_runtime/gpu/gpu_util_platform_specific.cc new file mode 100644 index 0000000000..f1b1174a28 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/gpu_util_platform_specific.cc @@ -0,0 +1,24 @@ +#include "tensorflow/core/common_runtime/device.h" +#include "tensorflow/core/common_runtime/gpu_device_context.h" +#include "tensorflow/core/framework/types.h" +#include "tensorflow/core/public/tensor.h" +#include "tensorflow/core/common_runtime/gpu/gpu_util.h" +#include "tensorflow/stream_executor/stream.h" + +namespace tensorflow { + +void GPUDeviceContext::CopyCPUTensorToDevice(const Tensor* cpu_tensor, + Device* device, + Tensor* device_tensor, + StatusCallback done) const { + GPUUtil::CopyCPUTensorToGPU(cpu_tensor, this, device, device_tensor, done); +} + +void GPUDeviceContext::CopyDeviceTensorToCPU(const Tensor* device_tensor, + const string& tensor_name, + Device* device, Tensor* cpu_tensor, + StatusCallback done) { + GPUUtil::CopyGPUTensorToCPU(device, this, device_tensor, cpu_tensor, done); +} + +} // namespace tensorflow diff --git a/tensorflow/core/common_runtime/gpu/pool_allocator.cc b/tensorflow/core/common_runtime/gpu/pool_allocator.cc new file mode 100644 index 0000000000..52deb7fce2 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/pool_allocator.cc @@ -0,0 +1,269 @@ +#include "tensorflow/core/common_runtime/gpu/pool_allocator.h" + +#include <errno.h> +#include <strings.h> +#include <sys/mman.h> // for munmap + +#include <map> + +#include "tensorflow/core/lib/strings/numbers.h" +#include "tensorflow/core/platform/logging.h" +#include "tensorflow/core/platform/port.h" +//#include "prodkernel/api/base/numa.h" + +namespace tensorflow { + +PoolAllocator::PoolAllocator(size_t pool_size_limit, bool auto_resize, + SubAllocator* allocator, + RoundUpInterface* size_rounder, string name) + : name_(name), + has_size_limit_(pool_size_limit > 0), + auto_resize_(auto_resize), + pool_size_limit_(pool_size_limit), + allocator_(allocator), + size_rounder_(size_rounder), + allocation_begun_(false) { + if (auto_resize) { + CHECK_LT(0, pool_size_limit) + << "size limit must be > 0 if auto_resize is true."; + } +} + +PoolAllocator::~PoolAllocator() { Clear(); } + +namespace { +// Pools contain Chunks allocatated from the underlying Allocator. +// Chunk alignment is always on kPoolAlignment boundaries. Each Chunk +// begins with a descriptor (ChunkPrefix) that gives its size and a +// pointer to itself. The pointer returned to the user is just past +// the ChunkPrefix. If the user asks for a larger alignment, we will +// increase the size of the chunk, then adjust the returned user +// pointer and also re-write the ChunkPrefix.chunk_ptr value +// immediately before it. This way the Chunk address and size can be +// recovered from the returned user pointer, regardless of alignment. +// Note that this deferencing of the pointers means that we cannot +// handle GPU memory, only CPU memory. +struct ChunkPrefix { + size_t num_bytes; + void* chunk_ptr; +}; +// kPoolAlignment cannot be less than the size of ChunkPrefix. +static const int kPoolAlignment = sizeof(ChunkPrefix); + +void* PrepareChunk(void* chunk, size_t alignment, size_t num_bytes) { + ChunkPrefix* cp = reinterpret_cast<ChunkPrefix*>(chunk); + cp->num_bytes = num_bytes; + cp->chunk_ptr = chunk; + void* user_ptr = reinterpret_cast<void*>(cp + 1); + if (alignment > kPoolAlignment) { + // Move user_ptr forward to the first satisfying offset, and write + // chunk_ptr just before it. + size_t aligned_ptr = reinterpret_cast<size_t>(user_ptr) + alignment; + user_ptr = reinterpret_cast<void*>(aligned_ptr & ~(alignment - 1)); + (reinterpret_cast<ChunkPrefix*>(user_ptr) - 1)->chunk_ptr = chunk; + } + // Safety check that user_ptr is always past the ChunkPrefix. + CHECK_GE(user_ptr, reinterpret_cast<ChunkPrefix*>(chunk) + 1); + return user_ptr; +} + +ChunkPrefix* FindPrefix(void* user_ptr) { + ChunkPrefix* cp = reinterpret_cast<ChunkPrefix*>(user_ptr) - 1; + return reinterpret_cast<ChunkPrefix*>(cp->chunk_ptr); +} +} // namespace + +void* PoolAllocator::AllocateRaw(size_t alignment, size_t num_bytes) { + if (!allocation_begun_) allocation_begun_ = true; + if (num_bytes == 0) return nullptr; + + // If alignment is larger than kPoolAlignment, increase num_bytes so that we + // are guaranteed to be able to return an aligned ptr by advancing user_ptr + // without overrunning the end of the chunk. + if (alignment > kPoolAlignment) { + num_bytes += alignment; + } + num_bytes += sizeof(ChunkPrefix); + num_bytes = size_rounder_->RoundUp(num_bytes); + PtrRecord* pr = nullptr; + if (has_size_limit_) { + { + mutex_lock lock(mutex_); + auto iter = pool_.find(num_bytes); + if (iter == pool_.end()) { + allocated_count_++; + // Deliberately fall out of lock scope before + // calling the allocator. No further modification + // to the pool will be performed. + } else { + get_from_pool_count_++; + pr = iter->second; + RemoveFromList(pr); + pool_.erase(iter); + // Fall out of lock scope and do the result without the lock held. + } + } + } + if (pr != nullptr) { + void* r = pr->ptr; + delete pr; + return PrepareChunk(r, alignment, num_bytes); + } else { + void* ptr = allocator_->Alloc(kPoolAlignment, num_bytes); + for (auto v : alloc_visitors_) { + v(ptr, num_bytes); + } + return PrepareChunk(ptr, alignment, num_bytes); + } +} + +void PoolAllocator::DeallocateRaw(void* ptr) { + if (ptr == nullptr) return; + ChunkPrefix* cp = FindPrefix(ptr); + CHECK_LE((void*)cp, (void*)ptr); + if (!has_size_limit_ && !auto_resize_) { + for (auto v : free_visitors_) { + v(cp, cp->num_bytes); + } + allocator_->Free(cp, cp->num_bytes); + } else { + mutex_lock lock(mutex_); + ++put_count_; + while (pool_.size() >= pool_size_limit_) { + EvictOne(); + } + PtrRecord* pr = new PtrRecord; + pr->num_bytes = cp->num_bytes; + pr->ptr = cp; + AddToList(pr); + pool_.insert(std::make_pair(cp->num_bytes, pr)); + } +} + +void PoolAllocator::Clear() { + if (has_size_limit_) { + mutex_lock lock(mutex_); + for (auto iter : pool_) { + PtrRecord* pr = iter.second; + for (auto v : free_visitors_) { + v(pr->ptr, pr->num_bytes); + } + allocator_->Free(pr->ptr, pr->num_bytes); + delete pr; + } + pool_.clear(); + get_from_pool_count_ = 0; + put_count_ = 0; + allocated_count_ = 0; + evicted_count_ = 0; + lru_head_ = nullptr; + lru_tail_ = nullptr; + } +} + +void PoolAllocator::RemoveFromList(PtrRecord* pr) { + if (pr->prev == nullptr) { + DCHECK_EQ(lru_head_, pr); + lru_head_ = nullptr; + } else { + pr->prev->next = pr->next; + } + if (pr->next == nullptr) { + DCHECK_EQ(lru_tail_, pr); + lru_tail_ = pr->prev; + } else { + pr->next->prev = pr->prev; + if (lru_head_ == nullptr) { + lru_head_ = pr->next; + } + } +} + +void PoolAllocator::AddToList(PtrRecord* pr) { + pr->prev = nullptr; + if (lru_head_ == nullptr) { + CHECK(lru_tail_ == nullptr); + lru_tail_ = pr; + pr->next = nullptr; + } else { + pr->next = lru_head_; + pr->next->prev = pr; + } + lru_head_ = pr; +} + +void PoolAllocator::EvictOne() { + DCHECK(lru_tail_ != nullptr); + PtrRecord* prec = lru_tail_; + RemoveFromList(prec); + auto iter = pool_.find(prec->num_bytes); + while (iter->second != prec) { + ++iter; + DCHECK(iter != pool_.end()); + } + pool_.erase(iter); + for (auto v : free_visitors_) { + v(prec->ptr, prec->num_bytes); + } + allocator_->Free(prec->ptr, prec->num_bytes); + delete prec; + ++evicted_count_; + // Auto-resizing, and warning messages. + static const double kTolerable = 2e-3; + static const int kCheckInterval = 1000; + static const double kIncreaseFactor = 1.1; + static const int kMinPoolSize = 100; + if (0 == evicted_count_ % kCheckInterval) { + const double eviction_rate = + evicted_count_ / static_cast<double>(put_count_); + const int64 alloc_request_count = allocated_count_ + get_from_pool_count_; + const double alloc_rate = + allocated_count_ / static_cast<double>(alloc_request_count); + static int log_counter = 0; + // (counter increment not thread safe but it's just for logging, so we + // don't care). + bool should_log = ((log_counter++ % 10) == 0); + if (should_log) { + LOG(WARNING) << "PoolAllocator: After " << alloc_request_count + << " get requests, put_count=" << put_count_ + << " evicted_count=" << evicted_count_ + << " eviction_rate=" << eviction_rate + << " and unsatisfied allocation rate=" << alloc_rate; + } + if (auto_resize_ && (eviction_rate > kTolerable) && + (alloc_rate > kTolerable)) { + size_t new_size_limit = (pool_size_limit_ < kMinPoolSize) + ? kMinPoolSize + : (kIncreaseFactor * pool_size_limit_); + if (should_log) { + LOG(INFO) << "Raising pool_size_limit_ from " << pool_size_limit_ + << " to " << new_size_limit; + } + pool_size_limit_ = new_size_limit; + // Reset all the counters so that ratios are relative to new sizes + // at next test interval. + put_count_ = 0; + allocated_count_ = 0; + evicted_count_ = 0; + get_from_pool_count_ = 0; + } + } +} + +void PoolAllocator::AddAllocVisitor(Visitor visitor) { + mutex_lock lock(mutex_); + CHECK(!allocation_begun_) + << "AddAllocVisitor may not be called after pool allocation " + << "has begun."; + alloc_visitors_.push_back(visitor); +} + +void PoolAllocator::AddFreeVisitor(Visitor visitor) { + mutex_lock lock(mutex_); + CHECK(!allocation_begun_) + << "AddFreeVisitor may not be called after pool allocation " + << "has begun."; + free_visitors_.push_back(visitor); +} + +} // namespace tensorflow diff --git a/tensorflow/core/common_runtime/gpu/pool_allocator.h b/tensorflow/core/common_runtime/gpu/pool_allocator.h new file mode 100644 index 0000000000..d10aabe88a --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/pool_allocator.h @@ -0,0 +1,202 @@ +#ifndef TENSORFLOW_COMMON_RUNTIME_GPU_POOL_ALLOCATOR_H_ +#define TENSORFLOW_COMMON_RUNTIME_GPU_POOL_ALLOCATOR_H_ + +// Simple LRU pool allocators for various flavors of CPU RAM that +// implement the VisitableAllocator interface. GPU memory is managed +// by GPURegionAllocator. + +#include <atomic> +#include <map> +#include <memory> +#include "tensorflow/core/lib/core/bits.h" +#include "tensorflow/core/platform/logging.h" +#include "tensorflow/core/platform/port.h" +#include "tensorflow/core/common_runtime/gpu/visitable_allocator.h" +#include "tensorflow/stream_executor/stream_executor.h" + +namespace tensorflow { + +// Interface of an object that does the underlying alloc/free of memory. +class SubAllocator { + public: + virtual ~SubAllocator() {} + virtual void* Alloc(size_t alignment, size_t num_bytes) = 0; + virtual void Free(void* ptr, size_t num_bytes) = 0; +}; + +// Interface of an object that rounds up integers. +class RoundUpInterface { + public: + virtual ~RoundUpInterface() {} + virtual size_t RoundUp(size_t num_bytes) = 0; +}; + +// Size-limited pool of memory buffers obtained from a SubAllocator +// instance. Pool eviction policy is LRU. +class PoolAllocator : public VisitableAllocator { + public: + // "pool_size_limit" is the maximum number of returned, re-usable + // memory buffers to keep in the pool. If pool_size_limit == 0, the + // pool is effectively a thin wrapper around the allocator. + // If "auto_resize" is true, then the pool_size_limit will gradually + // be raised so that deallocations happen very rarely, if at all. + // Transitory start-up objects may deallocate, but the long-term + // working-set should not. Auto-resizing can raise pool_size_limit + // but will never lower it. + // "allocator" is the object that performs the underlying memory + // malloc/free operations. This object takes ownership of allocator. + PoolAllocator(size_t pool_size_limit, bool auto_resize, + SubAllocator* allocator, RoundUpInterface* size_rounder, + string name); + ~PoolAllocator() override; + + string Name() override { return name_; } + + void* AllocateRaw(size_t alignment, size_t num_bytes) override; + + void DeallocateRaw(void* ptr) override; + + // REQUIRES: The following functions may only be called prior + // to the first Allocate*() call. Once allocation has begun, it is + // illegal to register another visitor. + + void AddAllocVisitor(Visitor visitor) override; + + void AddFreeVisitor(Visitor visitor) override; + + // Allocate an unused memory region of size "num_bytes". Fetch from + // the pool if available, otherwise call allocator_. + void* Get(size_t num_bytes); + + // Return a no-longer needed memory region to the pool. It is an error + // to deference "ptr" after this call. If the pool is full, the least + // recently used region will be deallocated. + void Put(void* ptr, size_t num_bytes); + + // Reset the pool to empty. + void Clear(); + + // The following accessors permit monitoring the effectiveness of + // the pool at avoiding repeated malloc/frees on the underlying + // allocator. Read locks are not taken on the theory that value + // consistency with other threads is not important. + + // Number of Get() requests satisfied from pool. + int64 get_from_pool_count() const NO_THREAD_SAFETY_ANALYSIS { + return get_from_pool_count_; + } + // Number of Put() requests. + int64 put_count() const NO_THREAD_SAFETY_ANALYSIS { return put_count_; } + // Number of Get() requests requiring a fresh allocation. + int64 allocated_count() const NO_THREAD_SAFETY_ANALYSIS { + return allocated_count_; + } + // Number of pool evictions. + int64 evicted_count() const NO_THREAD_SAFETY_ANALYSIS { + return evicted_count_; + } + // Current size limit. + size_t size_limit() const NO_THREAD_SAFETY_ANALYSIS { + return pool_size_limit_; + } + + private: + struct PtrRecord { + void* ptr; + size_t num_bytes; + PtrRecord* prev; + PtrRecord* next; + }; + + // Remove "pr" from the double-linked LRU list. + void RemoveFromList(PtrRecord* pr) EXCLUSIVE_LOCKS_REQUIRED(mutex_); + + // Add "pr" to the head of the double-linked LRU list. + void AddToList(PtrRecord* pr) EXCLUSIVE_LOCKS_REQUIRED(mutex_); + + // Delete the least recently used record. + void EvictOne() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + + const string name_; + const bool has_size_limit_; + const bool auto_resize_; + size_t pool_size_limit_; + std::unique_ptr<SubAllocator> allocator_; + std::unique_ptr<RoundUpInterface> size_rounder_; + mutex mutex_; + std::multimap<const size_t, PtrRecord*> pool_ GUARDED_BY(mutex_); + PtrRecord* lru_head_ GUARDED_BY(mutex_) = nullptr; + PtrRecord* lru_tail_ GUARDED_BY(mutex_) = nullptr; + int64 get_from_pool_count_ GUARDED_BY(mutex_) = 0; + int64 put_count_ GUARDED_BY(mutex_) = 0; + int64 allocated_count_ GUARDED_BY(mutex_) = 0; + int64 evicted_count_ GUARDED_BY(mutex_) = 0; + // Write access to these is guarded by mutex_, but not read + // access. They may only be modified prior to the first + // allocation. Later attempts to modify will fail. + std::vector<Visitor> alloc_visitors_; + std::vector<Visitor> free_visitors_; + std::atomic<bool> allocation_begun_; +}; + +// Do-nothing rounder. Passes through sizes unchanged. +class NoopRounder : public RoundUpInterface { + public: + size_t RoundUp(size_t num_bytes) override { return num_bytes; } +}; + +// Power of 2 rounder: rounds up to nearest power of 2 size. +class Pow2Rounder : public RoundUpInterface { + public: + size_t RoundUp(size_t num_bytes) override { + return 1uLL << Log2Ceiling64(num_bytes); + } +}; + +class BasicCPUAllocator : public SubAllocator { + public: + ~BasicCPUAllocator() override {} + + void* Alloc(size_t alignment, size_t num_bytes) override { + return port::aligned_malloc(num_bytes, alignment); + } + void Free(void* ptr, size_t num_bytes) override { free(ptr); } +}; + +// Allocator for pinned CPU RAM that is made known to CUDA for the +// purpose of efficient DMA with a GPU. +class CUDAHostAllocator : public SubAllocator { + public: + // Note: stream_exec cannot be null. + explicit CUDAHostAllocator(perftools::gputools::StreamExecutor* stream_exec) + : stream_exec_(stream_exec) { + CHECK(stream_exec_ != nullptr); + } + ~CUDAHostAllocator() override {} + + void* Alloc(size_t alignment, size_t num_bytes) override { + void* ptr = nullptr; + if (num_bytes > 0) { + ptr = stream_exec_->HostMemoryAllocate(num_bytes); + if (ptr == nullptr) { + LOG(FATAL) << "could not allocate pinned host memory of size: " + << num_bytes; + } + } + return ptr; + } + + void Free(void* ptr, size_t num_bytes) override { + if (ptr != nullptr) { + stream_exec_->HostMemoryDeallocate(ptr); + } + } + + private: + perftools::gputools::StreamExecutor* stream_exec_; // not owned, non-null + + TF_DISALLOW_COPY_AND_ASSIGN(CUDAHostAllocator); +}; + +} // namespace tensorflow +#endif // TENSORFLOW_COMMON_RUNTIME_GPU_POOL_ALLOCATOR_H_ diff --git a/tensorflow/core/common_runtime/gpu/pool_allocator_test.cc b/tensorflow/core/common_runtime/gpu/pool_allocator_test.cc new file mode 100644 index 0000000000..ca409b2b4c --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/pool_allocator_test.cc @@ -0,0 +1,203 @@ +#if GOOGLE_CUDA + +#include "tensorflow/core/common_runtime/gpu/pool_allocator.h" + +#include "tensorflow/stream_executor/multi_platform_manager.h" +#include "tensorflow/stream_executor/platform.h" +#include <gtest/gtest.h> + +namespace gpu = ::perftools::gputools; + +namespace tensorflow { +namespace { + +TEST(PoolAllocatorTest, ZeroSizeBuffers) { + gpu::Platform* platform = + gpu::MultiPlatformManager::PlatformWithName("cuda").ValueOrDie(); + PoolAllocator pool( + 2 /*pool_size_limit*/, false /*auto_resize*/, + new CUDAHostAllocator( + platform->GetExecutor(gpu::StreamExecutorConfig(/*ordinal=*/0)) + .ValueOrDie()), + new NoopRounder, "pool"); + + EXPECT_EQ(nullptr, pool.AllocateRaw(4 /*alignment*/, 0 /*num_bytes*/)); + pool.DeallocateRaw(nullptr); // Should not crash. + EXPECT_EQ(0, pool.get_from_pool_count()); + EXPECT_EQ(0, pool.put_count()); + EXPECT_EQ(0, pool.allocated_count()); + EXPECT_EQ(0, pool.evicted_count()); +} + +TEST(PoolAllocatorTest, ZeroSizePool) { + gpu::Platform* platform = + gpu::MultiPlatformManager::PlatformWithName("cuda").ValueOrDie(); + PoolAllocator pool( + 0 /*pool_size_limit*/, false /*auto_resize*/, + new CUDAHostAllocator( + platform->GetExecutor(gpu::StreamExecutorConfig(/*ordinal=*/0)) + .ValueOrDie()), + new NoopRounder, "pool"); + + EXPECT_EQ(0, pool.get_from_pool_count()); + EXPECT_EQ(0, pool.put_count()); + EXPECT_EQ(0, pool.allocated_count()); + EXPECT_EQ(0, pool.evicted_count()); + + // All allocations should bypass the pool and return valid pointers. + for (int i = 0; i < 3; ++i) { + void* p0 = pool.AllocateRaw(4, 0); + void* p4 = pool.AllocateRaw(4, 4); + void* p12 = pool.AllocateRaw(4, 12); + EXPECT_EQ(nullptr, p0); + EXPECT_NE(nullptr, p4); + EXPECT_NE(nullptr, p12); + pool.DeallocateRaw(p0); + pool.DeallocateRaw(p4); + pool.DeallocateRaw(p12); + } + EXPECT_EQ(0, pool.get_from_pool_count()); + EXPECT_EQ(0, pool.put_count()); + EXPECT_EQ(0, pool.allocated_count()); + EXPECT_EQ(0, pool.evicted_count()); +} + +TEST(PoolAllocatorTest, Alignment) { + gpu::Platform* platform = + gpu::MultiPlatformManager::PlatformWithName("cuda").ValueOrDie(); + PoolAllocator pool( + 0 /*pool_size_limit*/, false /*auto_resize*/, + new CUDAHostAllocator( + platform->GetExecutor(gpu::StreamExecutorConfig(/*ordinal=*/0)) + .ValueOrDie()), + new NoopRounder, "pool"); + for (int i = 0; i < 16; ++i) { + size_t alignment = 1 << i; + void* p = pool.AllocateRaw(alignment, 111); + EXPECT_TRUE(p != nullptr); + EXPECT_EQ(0, reinterpret_cast<int64>(p) & (alignment - 1)) + << "ptr: " << p << " alignment " << alignment; + // Intentionally don't deallocate, to test that destruction of + // the PoolAllocator frees all pending memory. + } +} + +TEST(PoolAllocatorTest, AutoResize) { + PoolAllocator pool(2 /*pool_size_limit*/, true /*auto_resize*/, + new BasicCPUAllocator, new NoopRounder, "pool"); + + // Alloc/dealloc 10 sizes just a few times, confirming pool size + // stays at 2. + for (int i = 0; i < 10; ++i) { + void* p = pool.AllocateRaw(4, 64 << i); + pool.DeallocateRaw(p); + } + EXPECT_EQ(0, pool.get_from_pool_count()); + EXPECT_EQ(10, pool.allocated_count()); + EXPECT_EQ(10, pool.put_count()); + EXPECT_EQ(8, pool.evicted_count()); + EXPECT_EQ(2, pool.size_limit()); + + // Then repeat 1200 times. Pool size limit should jump to 100. + for (int j = 0; j < 120; ++j) { + for (int i = 0; i < 10; ++i) { + void* p = pool.AllocateRaw(4, 64 << i); + pool.DeallocateRaw(p); + } + } + EXPECT_EQ(100, pool.size_limit()); +} + +TEST(PoolAllocatorTest, CudaHostAllocator) { + gpu::Platform* platform = + gpu::MultiPlatformManager::PlatformWithName("cuda").ValueOrDie(); + PoolAllocator pool( + 2 /*pool_size_limit*/, false /*auto_resize*/, + new CUDAHostAllocator( + platform->GetExecutor(gpu::StreamExecutorConfig(/*ordinal=*/0)) + .ValueOrDie()), + new NoopRounder, "pool"); + + // Repeatedly Get a 16-byte value, confirming that there's only + // one real allocation. + void* p1_16 = pool.AllocateRaw(4, 16); + EXPECT_EQ(0, pool.get_from_pool_count()); + EXPECT_EQ(1, pool.allocated_count()); + EXPECT_NE(nullptr, p1_16); + pool.DeallocateRaw(p1_16); + // Pool contents {16} + EXPECT_EQ(1, pool.put_count()); + void* p2_16 = pool.AllocateRaw(4, 16); // Get it again. + EXPECT_EQ(1, pool.get_from_pool_count()); + EXPECT_EQ(1, pool.allocated_count()); + EXPECT_EQ(p1_16, p2_16); // Same pointer value + pool.DeallocateRaw(p2_16); // Put it back. + // Pool contents {16} + EXPECT_EQ(2, pool.put_count()); + + // Get two more values of different sizes. + void* p3_4 = pool.AllocateRaw(4, 4); + EXPECT_EQ(2, pool.allocated_count()); + EXPECT_NE(p1_16, p3_4); // Different pointer value + EXPECT_NE(nullptr, p3_4); + pool.DeallocateRaw(p3_4); // Put it back. Pool is now full. + // Pool contents {4, 16} + EXPECT_EQ(3, pool.put_count()); + void* p4_2 = pool.AllocateRaw(4, 2); // Get a third size buffer. + EXPECT_NE(nullptr, p4_2); + EXPECT_EQ(0, pool.evicted_count()); + + // The pool is full: when we put back p4_2, the 16-byte buffer + // should be evicted since it was least recently inserted. + pool.DeallocateRaw(p4_2); + // Pool contents {2, 4} + EXPECT_EQ(4, pool.put_count()); + EXPECT_EQ(1, pool.evicted_count()); + + // Re-getting and putting size 2 or 4 should not alter pool size or + // num-evicted. + void* p5_4 = pool.AllocateRaw(4, 4); + EXPECT_NE(nullptr, p5_4); + pool.DeallocateRaw(p5_4); + void* p6_2 = pool.AllocateRaw(4, 2); + EXPECT_NE(nullptr, p6_2); + pool.DeallocateRaw(p6_2); + EXPECT_EQ(3, pool.get_from_pool_count()); + EXPECT_EQ(6, pool.put_count()); + EXPECT_EQ(3, pool.allocated_count()); + EXPECT_EQ(1, pool.evicted_count()); + + pool.Clear(); + EXPECT_EQ(0, pool.get_from_pool_count()); + EXPECT_EQ(0, pool.put_count()); + EXPECT_EQ(0, pool.allocated_count()); + EXPECT_EQ(0, pool.evicted_count()); +} + +TEST(PoolAllocatorTest, Pow2Rounder) { + Pow2Rounder rounder; + EXPECT_EQ(1, rounder.RoundUp(1)); + EXPECT_EQ(2, rounder.RoundUp(2)); + EXPECT_EQ(16, rounder.RoundUp(9)); + EXPECT_EQ(16, rounder.RoundUp(16)); + EXPECT_EQ(65536, rounder.RoundUp(41234)); + EXPECT_EQ(65536, rounder.RoundUp(65535)); + EXPECT_EQ(65536, rounder.RoundUp(65536)); +} + +TEST(PoolAllocatorTest, Name) { + gpu::Platform* platform = + gpu::MultiPlatformManager::PlatformWithName("cuda").ValueOrDie(); + PoolAllocator pool( + 2 /*pool_size_limit*/, false /*auto_resize*/, + new CUDAHostAllocator( + platform->GetExecutor(gpu::StreamExecutorConfig(/*ordinal=*/0)) + .ValueOrDie()), + new NoopRounder, "pool"); + EXPECT_EQ("pool", pool.Name()); +} + +} // namespace +} // namespace tensorflow + +#endif // GOOGLE_CUDA diff --git a/tensorflow/core/common_runtime/gpu/process_state.cc b/tensorflow/core/common_runtime/gpu/process_state.cc new file mode 100644 index 0000000000..70ac6130c2 --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/process_state.cc @@ -0,0 +1,220 @@ +#include "tensorflow/core/common_runtime/gpu/process_state.h" + +#include "tensorflow/core/framework/allocator.h" +#include "tensorflow/core/common_runtime/gpu/gpu_init.h" +#include "tensorflow/core/common_runtime/gpu/gpu_bfc_allocator.h" +#include "tensorflow/core/common_runtime/gpu/gpu_debug_allocator.h" +#include "tensorflow/core/common_runtime/gpu/gpu_region_allocator.h" +#include "tensorflow/core/common_runtime/gpu/pool_allocator.h" +#include "tensorflow/core/lib/strings/strcat.h" +#include "tensorflow/core/platform/logging.h" +#include "tensorflow/core/platform/port.h" +#include "tensorflow/stream_executor/multi_platform_manager.h" + +#if defined(PLATFORM_GOOGLE) +DEFINE_bool(record_mem_types, false, + "If true, record attributes of memory allocations and " + "dyanmically check for appropriate use of registered memory." + "Should only be true for debugging or diagnosis of " + "performance issues."); +DEFINE_bool(brain_mem_reg_cuda_dma, true, + "If true, register CPU RAM used to copy to/from GPU RAM " + "with the CUDA driver."); +DEFINE_bool(brain_gpu_use_bfc_allocator, false, + "If true, uses the Best-Fit GPU allocator."); +DEFINE_bool(brain_gpu_region_allocator_debug, false, + "If true, checks for memory overwrites by writing " + "distinctive patterns on both ends of allocated memory."); +DEFINE_bool(brain_gpu_region_allocator_reset_to_nan, false, + "If true, initializes all new Malloc buffers to NaN, " + "and resets the buffer to NaN upon Free."); + +#else +bool FLAGS_record_mem_types = false; +bool FLAGS_brain_mem_reg_cuda_dma = true; +bool FLAGS_brain_gpu_region_allocator_debug = false; +bool FLAGS_brain_gpu_region_allocator_reset_to_nan = false; +bool FLAGS_brain_gpu_use_bfc_allocator = false; +#endif + +namespace gpu = ::perftools::gputools; + +namespace tensorflow { + +ProcessState* ProcessState::instance_ = nullptr; + +/*static*/ ProcessState* ProcessState::singleton() { + if (instance_ == nullptr) { + instance_ = new ProcessState; + } + + return instance_; +} + +ProcessState::ProcessState() : gpu_count_(0) { + CHECK(instance_ == nullptr); + instance_ = this; +} + +ProcessState::~ProcessState() { + for (auto p : gpu_allocators_) { + delete p; + } + instance_ = nullptr; +} + +string ProcessState::MemDesc::DebugString() { + return strings::StrCat((loc == CPU ? "CPU " : "GPU "), dev_index, ", dma: ", + gpu_registered, ", nic: ", nic_registered); +} + +ProcessState::MemDesc ProcessState::PtrType(const void* ptr) { + if (FLAGS_record_mem_types) { + auto iter = mem_desc_map_.find(ptr); + if (iter != mem_desc_map_.end()) { + return iter->second; + } + } + return MemDesc(); +} + +void ProcessState::SetGPUCount(int c) { + CHECK(gpu_count_ == 0 || gpu_count_ == c) + << "Cannot call SetGPUCount with a non-zero value " + << "not equal to prior set value."; + gpu_count_ = c; +} + +int ProcessState::GPUCount() const { return gpu_count_; } + +Allocator* ProcessState::GetGPUAllocator(int gpu_id, size_t total_bytes) { +#if GOOGLE_CUDA + mutex_lock lock(mu_); + gpu::Platform* gpu_platform = GPUMachineManager(); + + // Verify that gpu_id is legitimate. + CHECK_LT(gpu_id, gpu_platform->VisibleDeviceCount()) + << "gpu_id is outside discovered device range"; + + if (gpu_id >= static_cast<int64>(gpu_allocators_.size())) { + gpu_allocators_.resize(gpu_id + 1); + if (FLAGS_record_mem_types) gpu_al_.resize(gpu_id + 1); + } + + if (gpu_allocators_[gpu_id] == nullptr) { + VisitableAllocator* gpu_allocator; + + if (FLAGS_brain_gpu_use_bfc_allocator) { + gpu_allocator = new GPUBFCAllocator(gpu_id, total_bytes); + } else { + gpu_allocator = new GPURegionAllocator(gpu_id, total_bytes); + } + + if (FLAGS_brain_gpu_region_allocator_debug) { + gpu_allocator = new GPUDebugAllocator(gpu_allocator, gpu_id); + } + if (FLAGS_brain_gpu_region_allocator_reset_to_nan) { + gpu_allocator = new GPUNanResetAllocator(gpu_allocator, gpu_id); + } + + gpu_allocators_[gpu_id] = gpu_allocator; + + // If there are any pending AllocVisitors for this bus, add + // them now. + gpu::StreamExecutor* se = + gpu_platform->ExecutorForDevice(gpu_id).ValueOrDie(); + int bus_id = se->GetDeviceDescription().numa_node(); + if (bus_id < static_cast<int64>(gpu_visitors_.size())) { + for (auto v : gpu_visitors_[bus_id]) { + gpu_allocators_[gpu_id]->AddAllocVisitor(v); + } + } + if (FLAGS_record_mem_types) { + MemDesc md; + md.loc = MemDesc::GPU; + md.dev_index = gpu_id; + md.gpu_registered = false; + md.nic_registered = true; + if (static_cast<int64>(gpu_al_.size()) <= gpu_id) + gpu_al_.resize(gpu_id + 1); + gpu_al_[gpu_id] = new internal::RecordingAllocator( + &mem_desc_map_, gpu_allocators_[gpu_id], md, &mu_); + } + } + if (FLAGS_record_mem_types) return gpu_al_[gpu_id]; + return gpu_allocators_[gpu_id]; +#else + LOG(FATAL) << "GPUAllocator unavailable. Not compiled with --config=cuda."; + return nullptr; +#endif // GOOGLE_CUDA +} + +Allocator* ProcessState::GetCPUAllocator(int numa_node) { + // Although we're temporarily ignoring numa_node, check for legality. + CHECK_GE(numa_node, 0); + // TODO(tucker): actually maintain separate CPUAllocators for + // different numa_nodes. For now, just one. + numa_node = 0; + mutex_lock lock(mu_); + while (cpu_allocators_.size() <= static_cast<size_t>(numa_node)) { + cpu_allocators_.push_back(new PoolAllocator( + 100 /*pool_size_limit*/, true /*auto_resize*/, new BasicCPUAllocator(), + new NoopRounder, "cpu_pool")); + } + return cpu_allocators_[0]; +} + +Allocator* ProcessState::GetCUDAHostAllocator(int numa_node) { + if (gpu_count_ == 0 || !FLAGS_brain_mem_reg_cuda_dma) { + return GetCPUAllocator(numa_node); + } + // Although we're temporarily ignoring numa_node, check for legality. + CHECK_GE(numa_node, 0); + // TODO(tucker): actually maintain separate CPUAllocators for + // different numa_nodes. For now, just one. + numa_node = 0; + mutex_lock lock(mu_); + while (static_cast<int>(cuda_host_allocators_.size()) <= numa_node) { + // CUDAHost alloc the same across all gpus, so just get the + // executor for the first device. + gpu::Platform* gpu_platform = GPUMachineManager(); + gpu::StreamExecutor* se = gpu_platform->ExecutorForDevice(0).ValueOrDie(); + CHECK(se); + cuda_host_allocators_.push_back(new PoolAllocator( + 100 /*pool_size_limit*/, true /*auto_resize*/, + new CUDAHostAllocator(se), new Pow2Rounder, "cuda_host")); + if (FLAGS_record_mem_types) { + MemDesc md; + md.loc = MemDesc::CPU; + md.dev_index = 0; + md.gpu_registered = true; + md.nic_registered = false; + cuda_al_.push_back(new internal::RecordingAllocator( + &mem_desc_map_, cuda_host_allocators_.back(), md, &mu_)); + } + } + if (FLAGS_record_mem_types) return cuda_al_[0]; + return cuda_host_allocators_[0]; +} + +void ProcessState::AddGPUAllocVisitor(int bus_id, AllocVisitor visitor) { +#if GOOGLE_CUDA + mutex_lock lock(mu_); + gpu::Platform* gpu_platform = GPUMachineManager(); + for (int gpu_id = 0; gpu_id < static_cast<int64>(gpu_allocators_.size()); + ++gpu_id) { + gpu::StreamExecutor* se = + gpu_platform->ExecutorForDevice(gpu_id).ValueOrDie(); + if (gpu_allocators_[gpu_id] && + se->GetDeviceDescription().numa_node() == bus_id) { + gpu_allocators_[gpu_id]->AddAllocVisitor(visitor); + } + } + while (bus_id >= static_cast<int64>(gpu_visitors_.size())) { + gpu_visitors_.push_back(std::vector<AllocVisitor>()); + } + gpu_visitors_[bus_id].push_back(visitor); +#endif // GOOGLE_CUDA +} + +} // namespace tensorflow diff --git a/tensorflow/core/common_runtime/gpu/process_state.h b/tensorflow/core/common_runtime/gpu/process_state.h new file mode 100644 index 0000000000..527d12c10d --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/process_state.h @@ -0,0 +1,140 @@ +#ifndef TENSORFLOW_COMMON_RUNTIME_GPU_PROCESS_STATE_H_ +#define TENSORFLOW_COMMON_RUNTIME_GPU_PROCESS_STATE_H_ + +#include <functional> +#include <unordered_map> +#include <vector> + +#include "tensorflow/core/framework/allocator.h" +#include "tensorflow/core/platform/port.h" +#include "tensorflow/core/platform/thread_annotations.h" + +namespace tensorflow { + +class Allocator; +class VisitableAllocator; +class PoolAllocator; + +// Singleton that manages per-process state, e.g. allocation +// of shared resources. +class ProcessState { + public: + static ProcessState* singleton(); + + // Descriptor for memory allocation attributes, used by optional + // runtime correctness analysis logic. + struct MemDesc { + enum MemLoc { CPU, GPU }; + MemLoc loc; + int dev_index; + bool gpu_registered; + bool nic_registered; + MemDesc() + : loc(CPU), + dev_index(0), + gpu_registered(false), + nic_registered(false) {} + string DebugString(); + }; + + // Records the number of GPUs available in the local process. + // It is a fatal error to call this with a value != to the value + // in a prior call. + void SetGPUCount(int c); + + // Returns number of GPUs available in local process, as set by + // SetGPUCount(); Returns 0 if SetGPUCount has not been called. + int GPUCount() const; + + // Returns what we know about the memory at ptr. + // If we know nothing, it's called CPU 0 with no other attributes. + MemDesc PtrType(const void* ptr); + + // Returns the one CPUAllocator used for the given numa_node. + // TEMPORY: ignores numa_node. + Allocator* GetCPUAllocator(int numa_node); + + // Returns the one GPU allocator used for the indexed GPU. + // Note that this is a system GPU index, not (necessarily) a brain + // device index. + // + // 'total_bytes' is the total number of bytes that should be made + // available to the allocator. The first call to this function for + // a given gpu_id creates the allocator, so only the total_bytes + // used on that first call is used. + // + // REQUIRES: gpu_id must be a valid ordinal for a GPU available in the + // current system environment. Otherwise returns nullptr. + Allocator* GetGPUAllocator(int gpu_id, size_t total_bytes); + + Allocator* GetCUDAHostAllocator(int numa_node); + + // Registers a function to be called once on every new Region + // allocated by every GPURegionAllocator proximate to the specified + // bus. The AllocVisitor is provided with a memory pointer and the + // size of the area it identifies. The pointer is not guaranteed to + // be valid after the call terminates. The intention is for this + // interface to be used for network device memory registration. + // "bus_id" is platform-specific. On many platforms it + // should be 0. On machines with multiple PCIe buses, it should be + // the index of one of the PCIe buses. If the the bus_id is invalid, + // results are undefined. + typedef std::function<void(void*, size_t)> AllocVisitor; + void AddGPUAllocVisitor(int bus_id, AllocVisitor visitor); + + typedef std::unordered_map<const void*, MemDesc> MDMap; + + protected: + ProcessState(); + + static ProcessState* instance_; + + mutex mu_; + int gpu_count_; + + std::vector<PoolAllocator*> cpu_allocators_ GUARDED_BY(mu_); + std::vector<VisitableAllocator*> gpu_allocators_ GUARDED_BY(mu_); + std::vector<std::vector<AllocVisitor>> gpu_visitors_ GUARDED_BY(mu_); + std::vector<PoolAllocator*> cuda_host_allocators_ GUARDED_BY(mu_); + + virtual ~ProcessState(); + + // Optional RecordingAllocators that wrap the corresponding + // Allocators for runtime attribute use analysis. + MDMap mem_desc_map_; + std::vector<Allocator*> cpu_al_ GUARDED_BY(mu_); + std::vector<Allocator*> gpu_al_ GUARDED_BY(mu_); + std::vector<Allocator*> cuda_al_ GUARDED_BY(mu_); +}; + +namespace internal { +class RecordingAllocator : public Allocator { + public: + RecordingAllocator(ProcessState::MDMap* mm, Allocator* a, + ProcessState::MemDesc md, mutex* mu) + : mm_(mm), a_(a), md_(md), mu_(mu) {} + + string Name() override { return a_->Name(); } + void* AllocateRaw(size_t alignment, size_t num_bytes) override { + void* p = a_->AllocateRaw(alignment, num_bytes); + mutex_lock l(*mu_); + (*mm_)[p] = md_; + return p; + } + void DeallocateRaw(void* p) override { + mutex_lock l(*mu_); + auto iter = mm_->find(p); + mm_->erase(iter); + a_->DeallocateRaw(p); + } + bool TracksAllocationSizes() override { return a_->TracksAllocationSizes(); } + size_t RequestedSize(void* p) override { return a_->RequestedSize(p); } + size_t AllocatedSize(void* p) override { return a_->AllocatedSize(p); } + ProcessState::MDMap* mm_; // not owned + Allocator* a_; // not owned + ProcessState::MemDesc md_; + mutex* mu_; +}; +} // namespace internal +} // namespace tensorflow +#endif // TENSORFLOW_COMMON_RUNTIME_GPU_PROCESS_STATE_H_ diff --git a/tensorflow/core/common_runtime/gpu/visitable_allocator.h b/tensorflow/core/common_runtime/gpu/visitable_allocator.h new file mode 100644 index 0000000000..23feed9aab --- /dev/null +++ b/tensorflow/core/common_runtime/gpu/visitable_allocator.h @@ -0,0 +1,30 @@ +#ifndef TENSORFLOW_COMMON_RUNTIME_GPU_VISITABLE_ALLOCATOR_H_ +#define TENSORFLOW_COMMON_RUNTIME_GPU_VISITABLE_ALLOCATOR_H_ + +#include <functional> +#include "tensorflow/core/framework/allocator.h" + +namespace tensorflow { + +// Subclass VisitableAllocator instead of Allocator when a memory +// allocator needs to enable some kind of registration/deregistration +// of memory areas. +class VisitableAllocator : public Allocator { + public: + // Visitor gets called with a pointer to a memory area and its + // size in bytes. + typedef std::function<void(void*, size_t)> Visitor; + + // Register a visitor guaranteed to be called exactly once on each + // chunk of memory newly allocated from the underlying device. + // Typically, chunks will be reused and possibly sub-divided by a + // pool manager, so the calls will happen only once per process + // execution, not once per tensor (re)allocation. + virtual void AddAllocVisitor(Visitor visitor) = 0; + + // Register a visitor guaranteed to be called on each chunk of + // memory returned to the underlying device. + virtual void AddFreeVisitor(Visitor visitor) = 0; +}; +} // namespace tensorflow +#endif // TENSORFLOW_COMMON_RUNTIME_GPU_VISITABLE_ALLOCATOR_H_ |