diff options
author | reed <reed@chromium.org> | 2015-04-28 17:50:31 -0700 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-04-28 17:50:32 -0700 |
commit | 5b6db07fb5d3b67476db6df126eb8290d49e564d (patch) | |
tree | c39da2702fc9aeeb3a13f9f9e3cc959ccc81261e | |
parent | a73239a0096370221d3dfababf339dd6d3fed84f (diff) |
SkRWBuffer for thread-safe 'stream' sharing
WIP
- Can accumulate (write) data in one thread, and share snapshots of it in other threads
... e.g. network accumulates image data, and periodically we want to decode/draw it
- If this sort of thing sticks, should we promote SkData to have the same generality as
SkRBuffer?
BUG=skia:
TBR=
Review URL: https://codereview.chromium.org/1106113002
-rw-r--r-- | gyp/core.gypi | 1 | ||||
-rw-r--r-- | src/core/SkRWBuffer.cpp | 353 | ||||
-rw-r--r-- | src/core/SkRWBuffer.h | 97 | ||||
-rw-r--r-- | tests/DataRefTest.cpp | 81 |
4 files changed, 532 insertions, 0 deletions
diff --git a/gyp/core.gypi b/gyp/core.gypi index 94878e1950..190236f11b 100644 --- a/gyp/core.gypi +++ b/gyp/core.gypi @@ -178,6 +178,7 @@ '<(skia_src_path)/core/SkRRect.cpp', '<(skia_src_path)/core/SkRTree.h', '<(skia_src_path)/core/SkRTree.cpp', + '<(skia_src_path)/core/SkRWBuffer.cpp', '<(skia_src_path)/core/SkScalar.cpp', '<(skia_src_path)/core/SkScalerContext.cpp', '<(skia_src_path)/core/SkScalerContext.h', diff --git a/src/core/SkRWBuffer.cpp b/src/core/SkRWBuffer.cpp new file mode 100644 index 0000000000..33d82af4f0 --- /dev/null +++ b/src/core/SkRWBuffer.cpp @@ -0,0 +1,353 @@ +/* + * Copyright 2015 Google Inc. + * + * Use of this source code is governed by a BSD-style license that can be + * found in the LICENSE file. + */ + +#include "SkRWBuffer.h" +#include "SkStream.h" + +// Force small chunks to be a page's worth +static const size_t kMinAllocSize = 4096; + +struct SkBufferBlock { + SkBufferBlock* fNext; + size_t fUsed; + size_t fCapacity; + + const void* startData() const { return this + 1; }; + + size_t avail() const { return fCapacity - fUsed; } + void* availData() { return (char*)this->startData() + fUsed; } + + static SkBufferBlock* Alloc(size_t length) { + size_t capacity = LengthToCapacity(length); + SkBufferBlock* block = (SkBufferBlock*)sk_malloc_throw(sizeof(SkBufferBlock) + capacity); + block->fNext = NULL; + block->fUsed = 0; + block->fCapacity = capacity; + return block; + } + + // Return number of bytes actually appended + size_t append(const void* src, size_t length) { + this->validate(); + size_t amount = SkTMin(this->avail(), length); + memcpy(this->availData(), src, amount); + fUsed += amount; + this->validate(); + return amount; + } + + void validate() const { +#ifdef SK_DEBUG + SkASSERT(fCapacity > 0); + SkASSERT(fUsed <= fCapacity); +#endif + } + +private: + static size_t LengthToCapacity(size_t length) { + const size_t minSize = kMinAllocSize - sizeof(SkBufferBlock); + return SkTMax(length, minSize); + } +}; + +struct SkBufferHead { + mutable int32_t fRefCnt; + SkBufferBlock fBlock; + + static size_t LengthToCapacity(size_t length) { + const size_t minSize = kMinAllocSize - sizeof(SkBufferHead); + return SkTMax(length, minSize); + } + + static SkBufferHead* Alloc(size_t length) { + size_t capacity = LengthToCapacity(length); + size_t size = sizeof(SkBufferHead) + capacity; + SkBufferHead* head = (SkBufferHead*)sk_malloc_throw(size); + head->fRefCnt = 1; + head->fBlock.fNext = NULL; + head->fBlock.fUsed = 0; + head->fBlock.fCapacity = capacity; + return head; + } + + void ref() const { + SkASSERT(fRefCnt > 0); + sk_atomic_inc(&fRefCnt); + } + + void unref() const { + SkASSERT(fRefCnt > 0); + // A release here acts in place of all releases we "should" have been doing in ref(). + if (1 == sk_atomic_fetch_add(&fRefCnt, -1, sk_memory_order_acq_rel)) { + // Like unique(), the acquire is only needed on success. + SkBufferBlock* block = fBlock.fNext; + sk_free((void*)this); + while (block) { + SkBufferBlock* next = block->fNext; + sk_free(block); + block = next; + } + } + } + + void validate(size_t minUsed, SkBufferBlock* tail = NULL) const { +#ifdef SK_DEBUG + SkASSERT(fRefCnt > 0); + size_t totalUsed = 0; + const SkBufferBlock* block = &fBlock; + const SkBufferBlock* lastBlock = block; + while (block) { + block->validate(); + totalUsed += block->fUsed; + lastBlock = block; + block = block->fNext; + } + SkASSERT(minUsed <= totalUsed); + if (tail) { + SkASSERT(tail == lastBlock); + } +#endif + } +}; + +SkROBuffer::SkROBuffer(const SkBufferHead* head, size_t used) : fHead(head), fUsed(used) { + if (head) { + fHead->ref(); + SkASSERT(used > 0); + head->validate(used); + } else { + SkASSERT(0 == used); + } +} + +SkROBuffer::~SkROBuffer() { + if (fHead) { + fHead->validate(fUsed); + fHead->unref(); + } +} + +SkROBuffer::Iter::Iter(const SkROBuffer* buffer) { + this->reset(buffer); +} + +void SkROBuffer::Iter::reset(const SkROBuffer* buffer) { + if (buffer) { + fBlock = &buffer->fHead->fBlock; + fRemaining = buffer->fUsed; + } else { + fBlock = NULL; + fRemaining = 0; + } +} + +const void* SkROBuffer::Iter::data() const { + return fRemaining ? fBlock->startData() : NULL; +} + +size_t SkROBuffer::Iter::size() const { + return SkTMin(fBlock->fUsed, fRemaining); +} + +bool SkROBuffer::Iter::next() { + if (fRemaining) { + fRemaining -= this->size(); + fBlock = fBlock->fNext; + } + return fRemaining != 0; +} + +SkRWBuffer::SkRWBuffer(size_t initialCapacity) : fHead(NULL), fTail(NULL), fTotalUsed(0) {} + +SkRWBuffer::~SkRWBuffer() { + this->validate(); + fHead->unref(); +} + +void SkRWBuffer::append(const void* src, size_t length) { + this->validate(); + if (0 == length) { + return; + } + + fTotalUsed += length; + + if (NULL == fHead) { + fHead = SkBufferHead::Alloc(length); + fTail = &fHead->fBlock; + } + + size_t written = fTail->append(src, length); + SkASSERT(written <= length); + src = (const char*)src + written; + length -= written; + + if (length) { + SkBufferBlock* block = SkBufferBlock::Alloc(length); + fTail->fNext = block; + fTail = block; + written = fTail->append(src, length); + SkASSERT(written == length); + } + this->validate(); +} + +void* SkRWBuffer::append(size_t length) { + this->validate(); + if (0 == length) { + return NULL; + } + + fTotalUsed += length; + + if (NULL == fHead) { + fHead = SkBufferHead::Alloc(length); + fTail = &fHead->fBlock; + } else if (fTail->avail() < length) { + SkBufferBlock* block = SkBufferBlock::Alloc(length); + fTail->fNext = block; + fTail = block; + } + + fTail->fUsed += length; + this->validate(); + return (char*)fTail->availData() - length; +} + +#ifdef SK_DEBUG +void SkRWBuffer::validate() const { + if (fHead) { + fHead->validate(fTotalUsed, fTail); + } else { + SkASSERT(NULL == fTail); + SkASSERT(0 == fTotalUsed); + } +} +#endif + +SkROBuffer* SkRWBuffer::newRBufferSnapshot() const { + return SkNEW_ARGS(SkROBuffer, (fHead, fTotalUsed)); +} + +/////////////////////////////////////////////////////////////////////////////////////////////////// + +class SkROBufferStreamAsset : public SkStreamAsset { + void validate() const { +#ifdef SK_DEBUG + SkASSERT(fGlobalOffset <= fBuffer->size()); + SkASSERT(fLocalOffset <= fIter.size()); + SkASSERT(fLocalOffset <= fGlobalOffset); +#endif + } + +#ifdef SK_DEBUG + class AutoValidate { + SkROBufferStreamAsset* fStream; + public: + AutoValidate(SkROBufferStreamAsset* stream) : fStream(stream) { stream->validate(); } + ~AutoValidate() { fStream->validate(); } + }; + #define AUTO_VALIDATE AutoValidate av(this); +#else + #define AUTO_VALIDATE +#endif + +public: + SkROBufferStreamAsset(const SkROBuffer* buffer) : fBuffer(SkRef(buffer)), fIter(buffer) { + fGlobalOffset = fLocalOffset = 0; + } + + virtual ~SkROBufferStreamAsset() { fBuffer->unref(); } + + size_t getLength() const override { return fBuffer->size(); } + + bool rewind() override { + AUTO_VALIDATE + fIter.reset(fBuffer); + fGlobalOffset = fLocalOffset = 0; + return true; + } + + size_t read(void* dst, size_t request) override { + AUTO_VALIDATE + size_t bytesRead = 0; + for (;;) { + size_t size = fIter.size(); + SkASSERT(fLocalOffset <= size); + size_t avail = SkTMin(size - fLocalOffset, request - bytesRead); + if (dst) { + memcpy(dst, (const char*)fIter.data() + fLocalOffset, avail); + dst = (char*)dst + avail; + } + bytesRead += avail; + fLocalOffset += avail; + SkASSERT(bytesRead <= request); + if (bytesRead == request) { + break; + } + // If we get here, we've exhausted the current iter + SkASSERT(fLocalOffset == size); + fLocalOffset = 0; + if (!fIter.next()) { + break; // ran out of data + } + } + fGlobalOffset += bytesRead; + SkASSERT(fGlobalOffset <= fBuffer->size()); + return bytesRead; + } + + bool isAtEnd() const override { + return fBuffer->size() == fGlobalOffset; + } + + SkStreamAsset* duplicate() const override { + return SkNEW_ARGS(SkROBufferStreamAsset, (fBuffer)); + } + + size_t getPosition() const { + return fGlobalOffset; + } + + bool seek(size_t position) { + AUTO_VALIDATE + if (position < fGlobalOffset) { + this->rewind(); + } + (void)this->skip(position - fGlobalOffset); + return true; + } + + bool move(long offset) { + AUTO_VALIDATE + offset += fGlobalOffset; + if (offset <= 0) { + this->rewind(); + } else { + (void)this->seek(SkToSizeT(offset)); + } + return true; + } + + SkStreamAsset* fork() const override { + SkStreamAsset* clone = this->duplicate(); + clone->seek(this->getPosition()); + return clone; + } + + +private: + const SkROBuffer* fBuffer; + SkROBuffer::Iter fIter; + size_t fLocalOffset; + size_t fGlobalOffset; +}; + +SkStreamAsset* SkRWBuffer::newStreamSnapshot() const { + SkAutoTUnref<SkROBuffer> buffer(this->newRBufferSnapshot()); + return SkNEW_ARGS(SkROBufferStreamAsset, (buffer)); +} diff --git a/src/core/SkRWBuffer.h b/src/core/SkRWBuffer.h new file mode 100644 index 0000000000..89cb425a7f --- /dev/null +++ b/src/core/SkRWBuffer.h @@ -0,0 +1,97 @@ +/* + * Copyright 2015 Google Inc. + * + * Use of this source code is governed by a BSD-style license that can be + * found in the LICENSE file. + */ + +#ifndef SkRWBuffer_DEFINED +#define SkRWBuffer_DEFINED + +#include "SkRefCnt.h" + +struct SkBufferBlock; +struct SkBufferHead; +class SkRWBuffer; +class SkStreamAsset; + +/** + * Contains a read-only, thread-sharable block of memory. To access the memory, the caller must + * instantiate a local iterator, as the memory is stored in 1 or more contiguous blocks. + */ +class SkROBuffer : public SkRefCnt { +public: + /** + * Return the logical length of the data owned/shared by this buffer. It may be stored in + * multiple contiguous blocks, accessible via the iterator. + */ + size_t size() const { return fUsed; } + + class Iter { + public: + Iter(const SkROBuffer*); + + void reset(const SkROBuffer*); + + /** + * Return the current continuous block of memory, or NULL if the iterator is exhausted + */ + const void* data() const; + + /** + * Returns the number of bytes in the current continguous block of memory, or 0 if the + * iterator is exhausted. + */ + size_t size() const; + + /** + * Advance to the next contiguous block of memory, returning true if there is another + * block, or false if the iterator is exhausted. + */ + bool next(); + + private: + const SkBufferBlock* fBlock; + size_t fRemaining; + }; + +private: + SkROBuffer(const SkBufferHead* head, size_t used); + virtual ~SkROBuffer(); + + const SkBufferHead* fHead; + const size_t fUsed; + + friend class SkRWBuffer; +}; + +/** + * Accumulates bytes of memory that are "appended" to it, growing internal storage as needed. + * The growth is done such that at any time, a RBuffer or StreamAsset can be snapped off, which + * can see the previously stored bytes, but which will be unaware of any future writes. + */ +class SkRWBuffer { +public: + SkRWBuffer(size_t initialCapacity = 0); + ~SkRWBuffer(); + + size_t size() const { return fTotalUsed; } + void append(const void* buffer, size_t length); + void* append(size_t length); + + SkROBuffer* newRBufferSnapshot() const; + SkStreamAsset* newStreamSnapshot() const; + +#ifdef SK_DEBUG + void validate() const; +#else + void validate() const {} +#endif + +private: + SkBufferHead* fHead; + SkBufferBlock* fTail; + size_t fTotalUsed; +}; + +#endif diff --git a/tests/DataRefTest.cpp b/tests/DataRefTest.cpp index 099e9098cb..981ac54fcf 100644 --- a/tests/DataRefTest.cpp +++ b/tests/DataRefTest.cpp @@ -232,3 +232,84 @@ DEF_TEST(Data, reporter) { test_cstring(reporter); test_files(reporter); } + +/////////////////////////////////////////////////////////////////////////////////////////////////// +#include "SkRWBuffer.h" + +const char gABC[] = "abcdefghijklmnopqrstuvwxyz"; + +static void check_abcs(skiatest::Reporter* reporter, const char buffer[], size_t size) { + REPORTER_ASSERT(reporter, size % 26 == 0); + for (size_t offset = 0; offset < size; offset += 26) { + REPORTER_ASSERT(reporter, !memcmp(&buffer[offset], gABC, 26)); + } +} + +// stream should contains an integral number of copies of gABC. +static void check_alphabet_stream(skiatest::Reporter* reporter, SkStream* stream) { + REPORTER_ASSERT(reporter, stream->hasLength()); + size_t size = stream->getLength(); + REPORTER_ASSERT(reporter, size % 26 == 0); + + SkAutoTMalloc<char> storage(size); + char* array = storage.get(); + size_t bytesRead = stream->read(array, size); + REPORTER_ASSERT(reporter, bytesRead == size); + check_abcs(reporter, array, size); + + // try checking backwards + for (size_t offset = size; offset > 0; offset -= 26) { + REPORTER_ASSERT(reporter, stream->seek(offset - 26)); + REPORTER_ASSERT(reporter, stream->getPosition() == offset - 26); + REPORTER_ASSERT(reporter, stream->read(array, 26) == 26); + check_abcs(reporter, array, 26); + REPORTER_ASSERT(reporter, stream->getPosition() == offset); + } +} + +// reader should contains an integral number of copies of gABC. +static void check_alphabet_buffer(skiatest::Reporter* reporter, const SkROBuffer* reader) { + size_t size = reader->size(); + REPORTER_ASSERT(reporter, size % 26 == 0); + + SkAutoTMalloc<char> storage(size); + SkROBuffer::Iter iter(reader); + size_t offset = 0; + do { + SkASSERT(offset + iter.size() <= size); + memcpy(storage.get() + offset, iter.data(), iter.size()); + offset += iter.size(); + } while (iter.next()); + REPORTER_ASSERT(reporter, offset == size); + check_abcs(reporter, storage.get(), size); +} + +DEF_TEST(RWBuffer, reporter) { + // Knowing that the default capacity is 4096, choose N large enough so we force it to use + // multiple buffers internally. + const int N = 1000; + SkROBuffer* readers[N]; + SkStream* streams[N]; + + { + SkRWBuffer buffer; + for (int i = 0; i < N; ++i) { + if (0 == (i & 1)) { + buffer.append(gABC, 26); + } else { + memcpy(buffer.append(26), gABC, 26); + } + readers[i] = buffer.newRBufferSnapshot(); + streams[i] = buffer.newStreamSnapshot(); + } + REPORTER_ASSERT(reporter, N*26 == buffer.size()); + } + + for (int i = 0; i < N; ++i) { + REPORTER_ASSERT(reporter, (i + 1) * 26U == readers[i]->size()); + check_alphabet_buffer(reporter, readers[i]); + check_alphabet_stream(reporter, streams[i]); + readers[i]->unref(); + SkDELETE(streams[i]); + } +} |