diff options
Diffstat (limited to 'src/google/protobuf/arena.cc')
-rw-r--r--[-rwxr-xr-x] | src/google/protobuf/arena.cc | 392 |
1 files changed, 211 insertions, 181 deletions
diff --git a/src/google/protobuf/arena.cc b/src/google/protobuf/arena.cc index e9289988..c117c9e5 100755..100644 --- a/src/google/protobuf/arena.cc +++ b/src/google/protobuf/arena.cc @@ -48,7 +48,7 @@ namespace protobuf { namespace internal { -google::protobuf::internal::SequenceNumber ArenaImpl::lifecycle_id_generator_; +std::atomic<int64> ArenaImpl::lifecycle_id_generator_; #if defined(GOOGLE_PROTOBUF_NO_THREADLOCAL) ArenaImpl::ThreadCache& ArenaImpl::thread_cache() { static internal::ThreadLocalStorage<ThreadCache>* thread_cache_ = @@ -65,24 +65,25 @@ GOOGLE_THREAD_LOCAL ArenaImpl::ThreadCache ArenaImpl::thread_cache_ = {-1, NULL} #endif void ArenaImpl::Init() { - lifecycle_id_ = lifecycle_id_generator_.GetNext(); - google::protobuf::internal::NoBarrier_Store(&hint_, 0); - google::protobuf::internal::NoBarrier_Store(&threads_, 0); + lifecycle_id_ = + lifecycle_id_generator_.fetch_add(1, std::memory_order_relaxed); + hint_.store(nullptr, std::memory_order_relaxed); + threads_.store(nullptr, std::memory_order_relaxed); if (initial_block_) { // Thread which calls Init() owns the first block. This allows the // single-threaded case to allocate on the first block without having to // perform atomic operations. - InitBlock(initial_block_, &thread_cache(), options_.initial_block_size); - ThreadInfo* info = NewThreadInfo(initial_block_); - info->next = NULL; - google::protobuf::internal::NoBarrier_Store(&threads_, - reinterpret_cast<google::protobuf::internal::AtomicWord>(info)); - google::protobuf::internal::NoBarrier_Store(&space_allocated_, - options_.initial_block_size); - CacheBlock(initial_block_); + new (initial_block_) Block(options_.initial_block_size, NULL); + SerialArena* serial = + SerialArena::New(initial_block_, &thread_cache(), this); + serial->set_next(NULL); + threads_.store(serial, std::memory_order_relaxed); + space_allocated_.store(options_.initial_block_size, + std::memory_order_relaxed); + CacheSerialArena(serial); } else { - google::protobuf::internal::NoBarrier_Store(&space_allocated_, 0); + space_allocated_.store(0, std::memory_order_relaxed); } } @@ -103,164 +104,158 @@ uint64 ArenaImpl::Reset() { return space_allocated; } -ArenaImpl::Block* ArenaImpl::NewBlock(void* me, Block* my_last_block, - size_t min_bytes) { +ArenaImpl::Block* ArenaImpl::NewBlock(Block* last_block, size_t min_bytes) { size_t size; - if (my_last_block != NULL) { + if (last_block) { // Double the current block size, up to a limit. - size = std::min(2 * my_last_block->size, options_.max_block_size); + size = std::min(2 * last_block->size(), options_.max_block_size); } else { size = options_.start_block_size; } - // Verify that min_bytes + kHeaderSize won't overflow. - GOOGLE_CHECK_LE(min_bytes, std::numeric_limits<size_t>::max() - kHeaderSize); - size = std::max(size, kHeaderSize + min_bytes); + // Verify that min_bytes + kBlockHeaderSize won't overflow. + GOOGLE_CHECK_LE(min_bytes, std::numeric_limits<size_t>::max() - kBlockHeaderSize); + size = std::max(size, kBlockHeaderSize + min_bytes); - Block* b = reinterpret_cast<Block*>(options_.block_alloc(size)); - InitBlock(b, me, size); - google::protobuf::internal::NoBarrier_AtomicIncrement(&space_allocated_, size); + void* mem = options_.block_alloc(size); + Block* b = new (mem) Block(size, last_block); + space_allocated_.fetch_add(size, std::memory_order_relaxed); return b; } -void ArenaImpl::InitBlock(Block* b, void *me, size_t size) { - b->pos = kHeaderSize; - b->size = size; - b->owner = me; - b->next = NULL; -#ifdef ADDRESS_SANITIZER - // Poison the rest of the block for ASAN. It was unpoisoned by the underlying - // malloc but it's not yet usable until we return it as part of an allocation. - ASAN_POISON_MEMORY_REGION( - reinterpret_cast<char*>(b) + b->pos, b->size - b->pos); -#endif // ADDRESS_SANITIZER -} +ArenaImpl::Block::Block(size_t size, Block* next) + : next_(next), pos_(kBlockHeaderSize), size_(size) {} -ArenaImpl::CleanupChunk* ArenaImpl::ExpandCleanupList(CleanupChunk* cleanup, - Block* b) { - size_t size = cleanup ? cleanup->size * 2 : kMinCleanupListElements; +GOOGLE_PROTOBUF_ATTRIBUTE_NOINLINE +void ArenaImpl::SerialArena::AddCleanupFallback(void* elem, + void (*cleanup)(void*)) { + size_t size = cleanup_ ? cleanup_->size * 2 : kMinCleanupListElements; size = std::min(size, kMaxCleanupListElements); size_t bytes = internal::AlignUpTo8(CleanupChunk::SizeOf(size)); - if (b->avail() < bytes) { - b = GetBlock(bytes); - } - CleanupChunk* list = - reinterpret_cast<CleanupChunk*>(AllocFromBlock(b, bytes)); - list->next = b->thread_info->cleanup; + CleanupChunk* list = reinterpret_cast<CleanupChunk*>(AllocateAligned(bytes)); + list->next = cleanup_; list->size = size; - list->len = 0; - b->thread_info->cleanup = list; - return list; + + cleanup_ = list; + cleanup_ptr_ = &list->nodes[0]; + cleanup_limit_ = &list->nodes[size]; + + AddCleanup(elem, cleanup); } -inline GOOGLE_PROTOBUF_ATTRIBUTE_ALWAYS_INLINE -void ArenaImpl::AddCleanupInBlock( - Block* b, void* elem, void (*func)(void*)) { - CleanupChunk* cleanup = b->thread_info->cleanup; - if (cleanup == NULL || cleanup->len == cleanup->size) { - cleanup = ExpandCleanupList(cleanup, b); +GOOGLE_PROTOBUF_ATTRIBUTE_FUNC_ALIGN(32) +void* ArenaImpl::AllocateAligned(size_t n) { + SerialArena* arena; + if (GOOGLE_PREDICT_TRUE(GetSerialArenaFast(&arena))) { + return arena->AllocateAligned(n); + } else { + return AllocateAlignedFallback(n); } +} - CleanupNode* node = &cleanup->nodes[cleanup->len++]; - - node->elem = elem; - node->cleanup = func; +void* ArenaImpl::AllocateAlignedAndAddCleanup(size_t n, + void (*cleanup)(void*)) { + SerialArena* arena; + if (GOOGLE_PREDICT_TRUE(GetSerialArenaFast(&arena))) { + return arena->AllocateAlignedAndAddCleanup(n, cleanup); + } else { + return AllocateAlignedAndAddCleanupFallback(n, cleanup); + } } void ArenaImpl::AddCleanup(void* elem, void (*cleanup)(void*)) { - return AddCleanupInBlock(GetBlock(0), elem, cleanup); + SerialArena* arena; + if (GOOGLE_PREDICT_TRUE(GetSerialArenaFast(&arena))) { + arena->AddCleanup(elem, cleanup); + } else { + return AddCleanupFallback(elem, cleanup); + } } -void* ArenaImpl::AllocateAligned(size_t n) { - GOOGLE_DCHECK_EQ(internal::AlignUpTo8(n), n); // Must be already aligned. - - return AllocFromBlock(GetBlock(n), n); +GOOGLE_PROTOBUF_ATTRIBUTE_NOINLINE +void* ArenaImpl::AllocateAlignedFallback(size_t n) { + return GetSerialArena()->AllocateAligned(n); } -void* ArenaImpl::AllocateAlignedAndAddCleanup(size_t n, - void (*cleanup)(void*)) { - GOOGLE_DCHECK_EQ(internal::AlignUpTo8(n), n); // Must be already aligned. +GOOGLE_PROTOBUF_ATTRIBUTE_NOINLINE +void* ArenaImpl::AllocateAlignedAndAddCleanupFallback(size_t n, + void (*cleanup)(void*)) { + return GetSerialArena()->AllocateAlignedAndAddCleanup(n, cleanup); +} - Block* b = GetBlock(n); - void* mem = AllocFromBlock(b, n); - AddCleanupInBlock(b, mem, cleanup); - return mem; +GOOGLE_PROTOBUF_ATTRIBUTE_NOINLINE +void ArenaImpl::AddCleanupFallback(void* elem, void (*cleanup)(void*)) { + GetSerialArena()->AddCleanup(elem, cleanup); } inline GOOGLE_PROTOBUF_ATTRIBUTE_ALWAYS_INLINE -ArenaImpl::Block* ArenaImpl::GetBlock(size_t n) { - Block* my_block = NULL; - +bool ArenaImpl::GetSerialArenaFast(ArenaImpl::SerialArena** arena) { // If this thread already owns a block in this arena then try to use that. // This fast path optimizes the case where multiple threads allocate from the // same arena. ThreadCache* tc = &thread_cache(); - if (tc->last_lifecycle_id_seen == lifecycle_id_) { - my_block = tc->last_block_used_; - if (my_block->avail() >= n) { - return my_block; - } + if (GOOGLE_PREDICT_TRUE(tc->last_lifecycle_id_seen == lifecycle_id_)) { + *arena = tc->last_serial_arena; + return true; } - // Check whether we own the last accessed block on this arena. - // This fast path optimizes the case where a single thread uses multiple - // arenas. - Block* b = reinterpret_cast<Block*>(google::protobuf::internal::Acquire_Load(&hint_)); - if (b != NULL && b->owner == tc) { - my_block = b; - if (my_block->avail() >= n) { - return my_block; - } + // Check whether we own the last accessed SerialArena on this arena. This + // fast path optimizes the case where a single thread uses multiple arenas. + SerialArena* serial = hint_.load(std::memory_order_acquire); + if (GOOGLE_PREDICT_TRUE(serial != NULL && serial->owner() == tc)) { + *arena = serial; + return true; } - return GetBlockSlow(tc, my_block, n); + + return false; } -inline GOOGLE_PROTOBUF_ATTRIBUTE_ALWAYS_INLINE -void* ArenaImpl::AllocFromBlock(Block* b, size_t n) { - GOOGLE_DCHECK_EQ(internal::AlignUpTo8(b->pos), b->pos); // Must be already aligned. - GOOGLE_DCHECK_EQ(internal::AlignUpTo8(n), n); // Must be already aligned. - GOOGLE_DCHECK_GE(b->avail(), n); - size_t p = b->pos; - b->pos = p + n; +ArenaImpl::SerialArena* ArenaImpl::GetSerialArena() { + SerialArena* arena; + if (GOOGLE_PREDICT_TRUE(GetSerialArenaFast(&arena))) { + return arena; + } else { + return GetSerialArenaFallback(&thread_cache()); + } +} + +GOOGLE_PROTOBUF_ATTRIBUTE_NOINLINE +void* ArenaImpl::SerialArena::AllocateAlignedFallback(size_t n) { + // Sync back to current's pos. + head_->set_pos(head_->size() - (limit_ - ptr_)); + + head_ = arena_->NewBlock(head_, n); + ptr_ = head_->Pointer(head_->pos()); + limit_ = head_->Pointer(head_->size()); + #ifdef ADDRESS_SANITIZER - ASAN_UNPOISON_MEMORY_REGION(reinterpret_cast<char*>(b) + p, n); + ASAN_POISON_MEMORY_REGION(ptr_, limit_ - ptr_); #endif // ADDRESS_SANITIZER - return reinterpret_cast<char*>(b) + p; -} -ArenaImpl::Block* ArenaImpl::GetBlockSlow(void* me, Block* my_full_block, - size_t n) { - ThreadInfo* info = - my_full_block ? my_full_block->thread_info : GetThreadInfo(me, n); - GOOGLE_DCHECK(info != NULL); - Block* b = info->head; - if (b->avail() < n) { - Block* new_b = NewBlock(me, b, n); - new_b->thread_info = info; - new_b->next = b; - info->head = new_b; - b = new_b; - } - CacheBlock(b); - return b; + return AllocateAligned(n); } uint64 ArenaImpl::SpaceAllocated() const { - return google::protobuf::internal::NoBarrier_Load(&space_allocated_); + return space_allocated_.load(std::memory_order_relaxed); } uint64 ArenaImpl::SpaceUsed() const { - ThreadInfo* info = - reinterpret_cast<ThreadInfo*>(google::protobuf::internal::Acquire_Load(&threads_)); + SerialArena* serial = threads_.load(std::memory_order_acquire); uint64 space_used = 0; - - for ( ; info; info = info->next) { - // Remove the overhead of the ThreadInfo itself. - space_used -= sizeof(ThreadInfo); - for (Block* b = info->head; b; b = b->next) { - space_used += (b->pos - kHeaderSize); - } + for ( ; serial; serial = serial->next()) { + space_used += serial->SpaceUsed(); } + return space_used; +} +uint64 ArenaImpl::SerialArena::SpaceUsed() const { + // Get current block's size from ptr_ (since we can't trust head_->pos(). + uint64 space_used = ptr_ - head_->Pointer(kBlockHeaderSize); + // Get subsequent block size from b->pos(). + for (Block* b = head_->next(); b; b = b->next()) { + space_used += (b->pos() - kBlockHeaderSize); + } + // Remove the overhead of the SerialArena itself. + space_used -= kSerialArenaSize; return space_used; } @@ -268,30 +263,44 @@ uint64 ArenaImpl::FreeBlocks() { uint64 space_allocated = 0; // By omitting an Acquire barrier we ensure that any user code that doesn't // properly synchronize Reset() or the destructor will throw a TSAN warning. - ThreadInfo* info = - reinterpret_cast<ThreadInfo*>(google::protobuf::internal::NoBarrier_Load(&threads_)); + SerialArena* serial = threads_.load(std::memory_order_relaxed); + + while (serial) { + // This is inside a block we are freeing, so we need to read it now. + SerialArena* next = serial->next(); + space_allocated += ArenaImpl::SerialArena::Free(serial, initial_block_, + options_.block_dealloc); + // serial is dead now. + serial = next; + } - while (info) { + return space_allocated; +} + +uint64 ArenaImpl::SerialArena::Free(ArenaImpl::SerialArena* serial, + Block* initial_block, + void (*block_dealloc)(void*, size_t)) { + uint64 space_allocated = 0; + + // We have to be careful in this function, since we will be freeing the Block + // that contains this SerialArena. Be careful about accessing |serial|. + + for (Block* b = serial->head_; b; ) { // This is inside the block we are freeing, so we need to read it now. - ThreadInfo* next_info = info->next; - for (Block* b = info->head; b; ) { - // This is inside the block we are freeing, so we need to read it now. - Block* next_block = b->next; - space_allocated += (b->size); + Block* next_block = b->next(); + space_allocated += (b->size()); #ifdef ADDRESS_SANITIZER - // This memory was provided by the underlying allocator as unpoisoned, so - // return it in an unpoisoned state. - ASAN_UNPOISON_MEMORY_REGION(reinterpret_cast<char*>(b), b->size); + // This memory was provided by the underlying allocator as unpoisoned, so + // return it in an unpoisoned state. + ASAN_UNPOISON_MEMORY_REGION(b->Pointer(0), b->size()); #endif // ADDRESS_SANITIZER - if (b != initial_block_) { - options_.block_dealloc(b, b->size); - } - - b = next_block; + if (b != initial_block) { + block_dealloc(b, b->size()); } - info = next_info; + + b = next_block; } return space_allocated; @@ -300,63 +309,84 @@ uint64 ArenaImpl::FreeBlocks() { void ArenaImpl::CleanupList() { // By omitting an Acquire barrier we ensure that any user code that doesn't // properly synchronize Reset() or the destructor will throw a TSAN warning. - ThreadInfo* info = - reinterpret_cast<ThreadInfo*>(google::protobuf::internal::NoBarrier_Load(&threads_)); - - for ( ; info; info = info->next) { - CleanupChunk* list = info->cleanup; - while (list) { - size_t n = list->len; - CleanupNode* node = &list->nodes[list->len - 1]; - for (size_t i = 0; i < n; i++, node--) { - node->cleanup(node->elem); - } - list = list->next; - } + SerialArena* serial = threads_.load(std::memory_order_relaxed); + + for ( ; serial; serial = serial->next()) { + serial->CleanupList(); } } -ArenaImpl::ThreadInfo* ArenaImpl::NewThreadInfo(Block* b) { - GOOGLE_DCHECK(FindThreadInfo(b->owner) == NULL); - ThreadInfo* info = - reinterpret_cast<ThreadInfo*>(AllocFromBlock(b, sizeof(ThreadInfo))); - b->thread_info = info; - info->owner = b->owner; - info->head = b; - info->cleanup = NULL; - return info; +void ArenaImpl::SerialArena::CleanupList() { + if (cleanup_ != NULL) { + CleanupListFallback(); + } } -ArenaImpl::ThreadInfo* ArenaImpl::FindThreadInfo(void* me) { - ThreadInfo* info = - reinterpret_cast<ThreadInfo*>(google::protobuf::internal::Acquire_Load(&threads_)); - for ( ; info; info = info->next) { - if (info->owner == me) { - return info; +void ArenaImpl::SerialArena::CleanupListFallback() { + // Cleanup newest chunk: ptrs give us length. + size_t n = cleanup_ptr_ - &cleanup_->nodes[0]; + CleanupNode* node = cleanup_ptr_; + for (size_t i = 0; i < n; i++) { + --node; + node->cleanup(node->elem); + } + + // Cleanup older chunks, which are known to be full. + CleanupChunk* list = cleanup_->next; + while (list) { + size_t n = list->size; + CleanupNode* node = &list->nodes[list->size]; + for (size_t i = 0; i < n; i++) { + --node; + node->cleanup(node->elem); } + list = list->next; } +} - return NULL; +ArenaImpl::SerialArena* ArenaImpl::SerialArena::New(Block* b, void* owner, + ArenaImpl* arena) { + GOOGLE_DCHECK_EQ(b->pos(), kBlockHeaderSize); // Should be a fresh block + GOOGLE_DCHECK_LE(kBlockHeaderSize + kSerialArenaSize, b->size()); + SerialArena* serial = + reinterpret_cast<SerialArena*>(b->Pointer(kBlockHeaderSize)); + b->set_pos(kBlockHeaderSize + kSerialArenaSize); + serial->arena_ = arena; + serial->owner_ = owner; + serial->head_ = b; + serial->ptr_ = b->Pointer(b->pos()); + serial->limit_ = b->Pointer(b->size()); + serial->cleanup_ = NULL; + serial->cleanup_ptr_ = NULL; + serial->cleanup_limit_ = NULL; + return serial; } -ArenaImpl::ThreadInfo* ArenaImpl::GetThreadInfo(void* me, size_t n) { - ThreadInfo* info = FindThreadInfo(me); +GOOGLE_PROTOBUF_ATTRIBUTE_NOINLINE +ArenaImpl::SerialArena* ArenaImpl::GetSerialArenaFallback(void* me) { + // Look for this SerialArena in our linked list. + SerialArena* serial = threads_.load(std::memory_order_acquire); + for ( ; serial; serial = serial->next()) { + if (serial->owner() == me) { + break; + } + } - if (!info) { - // This thread doesn't have any ThreadInfo, which also means it doesn't have - // any blocks yet. So we'll allocate its first block now. - Block* b = NewBlock(me, NULL, sizeof(ThreadInfo) + n); - info = NewThreadInfo(b); + if (!serial) { + // This thread doesn't have any SerialArena, which also means it doesn't + // have any blocks yet. So we'll allocate its first block now. + Block* b = NewBlock(NULL, kSerialArenaSize); + serial = SerialArena::New(b, me, this); - google::protobuf::internal::AtomicWord head; + SerialArena* head = threads_.load(std::memory_order_relaxed); do { - head = google::protobuf::internal::NoBarrier_Load(&threads_); - info->next = reinterpret_cast<ThreadInfo*>(head); - } while (google::protobuf::internal::Release_CompareAndSwap( - &threads_, head, reinterpret_cast<google::protobuf::internal::AtomicWord>(info)) != head); + serial->set_next(head); + } while (!threads_.compare_exchange_weak( + head, serial, std::memory_order_release, std::memory_order_relaxed)); } - return info; + CacheSerialArena(serial); + return serial; } } // namespace internal |