aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar reed <reed@chromium.org>2015-04-28 17:50:31 -0700
committerGravatar Commit bot <commit-bot@chromium.org>2015-04-28 17:50:32 -0700
commit5b6db07fb5d3b67476db6df126eb8290d49e564d (patch)
treec39da2702fc9aeeb3a13f9f9e3cc959ccc81261e
parenta73239a0096370221d3dfababf339dd6d3fed84f (diff)
SkRWBuffer for thread-safe 'stream' sharing
WIP - Can accumulate (write) data in one thread, and share snapshots of it in other threads ... e.g. network accumulates image data, and periodically we want to decode/draw it - If this sort of thing sticks, should we promote SkData to have the same generality as SkRBuffer? BUG=skia: TBR= Review URL: https://codereview.chromium.org/1106113002
-rw-r--r--gyp/core.gypi1
-rw-r--r--src/core/SkRWBuffer.cpp353
-rw-r--r--src/core/SkRWBuffer.h97
-rw-r--r--tests/DataRefTest.cpp81
4 files changed, 532 insertions, 0 deletions
diff --git a/gyp/core.gypi b/gyp/core.gypi
index 94878e1950..190236f11b 100644
--- a/gyp/core.gypi
+++ b/gyp/core.gypi
@@ -178,6 +178,7 @@
'<(skia_src_path)/core/SkRRect.cpp',
'<(skia_src_path)/core/SkRTree.h',
'<(skia_src_path)/core/SkRTree.cpp',
+ '<(skia_src_path)/core/SkRWBuffer.cpp',
'<(skia_src_path)/core/SkScalar.cpp',
'<(skia_src_path)/core/SkScalerContext.cpp',
'<(skia_src_path)/core/SkScalerContext.h',
diff --git a/src/core/SkRWBuffer.cpp b/src/core/SkRWBuffer.cpp
new file mode 100644
index 0000000000..33d82af4f0
--- /dev/null
+++ b/src/core/SkRWBuffer.cpp
@@ -0,0 +1,353 @@
+/*
+ * Copyright 2015 Google Inc.
+ *
+ * Use of this source code is governed by a BSD-style license that can be
+ * found in the LICENSE file.
+ */
+
+#include "SkRWBuffer.h"
+#include "SkStream.h"
+
+// Force small chunks to be a page's worth
+static const size_t kMinAllocSize = 4096;
+
+struct SkBufferBlock {
+ SkBufferBlock* fNext;
+ size_t fUsed;
+ size_t fCapacity;
+
+ const void* startData() const { return this + 1; };
+
+ size_t avail() const { return fCapacity - fUsed; }
+ void* availData() { return (char*)this->startData() + fUsed; }
+
+ static SkBufferBlock* Alloc(size_t length) {
+ size_t capacity = LengthToCapacity(length);
+ SkBufferBlock* block = (SkBufferBlock*)sk_malloc_throw(sizeof(SkBufferBlock) + capacity);
+ block->fNext = NULL;
+ block->fUsed = 0;
+ block->fCapacity = capacity;
+ return block;
+ }
+
+ // Return number of bytes actually appended
+ size_t append(const void* src, size_t length) {
+ this->validate();
+ size_t amount = SkTMin(this->avail(), length);
+ memcpy(this->availData(), src, amount);
+ fUsed += amount;
+ this->validate();
+ return amount;
+ }
+
+ void validate() const {
+#ifdef SK_DEBUG
+ SkASSERT(fCapacity > 0);
+ SkASSERT(fUsed <= fCapacity);
+#endif
+ }
+
+private:
+ static size_t LengthToCapacity(size_t length) {
+ const size_t minSize = kMinAllocSize - sizeof(SkBufferBlock);
+ return SkTMax(length, minSize);
+ }
+};
+
+struct SkBufferHead {
+ mutable int32_t fRefCnt;
+ SkBufferBlock fBlock;
+
+ static size_t LengthToCapacity(size_t length) {
+ const size_t minSize = kMinAllocSize - sizeof(SkBufferHead);
+ return SkTMax(length, minSize);
+ }
+
+ static SkBufferHead* Alloc(size_t length) {
+ size_t capacity = LengthToCapacity(length);
+ size_t size = sizeof(SkBufferHead) + capacity;
+ SkBufferHead* head = (SkBufferHead*)sk_malloc_throw(size);
+ head->fRefCnt = 1;
+ head->fBlock.fNext = NULL;
+ head->fBlock.fUsed = 0;
+ head->fBlock.fCapacity = capacity;
+ return head;
+ }
+
+ void ref() const {
+ SkASSERT(fRefCnt > 0);
+ sk_atomic_inc(&fRefCnt);
+ }
+
+ void unref() const {
+ SkASSERT(fRefCnt > 0);
+ // A release here acts in place of all releases we "should" have been doing in ref().
+ if (1 == sk_atomic_fetch_add(&fRefCnt, -1, sk_memory_order_acq_rel)) {
+ // Like unique(), the acquire is only needed on success.
+ SkBufferBlock* block = fBlock.fNext;
+ sk_free((void*)this);
+ while (block) {
+ SkBufferBlock* next = block->fNext;
+ sk_free(block);
+ block = next;
+ }
+ }
+ }
+
+ void validate(size_t minUsed, SkBufferBlock* tail = NULL) const {
+#ifdef SK_DEBUG
+ SkASSERT(fRefCnt > 0);
+ size_t totalUsed = 0;
+ const SkBufferBlock* block = &fBlock;
+ const SkBufferBlock* lastBlock = block;
+ while (block) {
+ block->validate();
+ totalUsed += block->fUsed;
+ lastBlock = block;
+ block = block->fNext;
+ }
+ SkASSERT(minUsed <= totalUsed);
+ if (tail) {
+ SkASSERT(tail == lastBlock);
+ }
+#endif
+ }
+};
+
+SkROBuffer::SkROBuffer(const SkBufferHead* head, size_t used) : fHead(head), fUsed(used) {
+ if (head) {
+ fHead->ref();
+ SkASSERT(used > 0);
+ head->validate(used);
+ } else {
+ SkASSERT(0 == used);
+ }
+}
+
+SkROBuffer::~SkROBuffer() {
+ if (fHead) {
+ fHead->validate(fUsed);
+ fHead->unref();
+ }
+}
+
+SkROBuffer::Iter::Iter(const SkROBuffer* buffer) {
+ this->reset(buffer);
+}
+
+void SkROBuffer::Iter::reset(const SkROBuffer* buffer) {
+ if (buffer) {
+ fBlock = &buffer->fHead->fBlock;
+ fRemaining = buffer->fUsed;
+ } else {
+ fBlock = NULL;
+ fRemaining = 0;
+ }
+}
+
+const void* SkROBuffer::Iter::data() const {
+ return fRemaining ? fBlock->startData() : NULL;
+}
+
+size_t SkROBuffer::Iter::size() const {
+ return SkTMin(fBlock->fUsed, fRemaining);
+}
+
+bool SkROBuffer::Iter::next() {
+ if (fRemaining) {
+ fRemaining -= this->size();
+ fBlock = fBlock->fNext;
+ }
+ return fRemaining != 0;
+}
+
+SkRWBuffer::SkRWBuffer(size_t initialCapacity) : fHead(NULL), fTail(NULL), fTotalUsed(0) {}
+
+SkRWBuffer::~SkRWBuffer() {
+ this->validate();
+ fHead->unref();
+}
+
+void SkRWBuffer::append(const void* src, size_t length) {
+ this->validate();
+ if (0 == length) {
+ return;
+ }
+
+ fTotalUsed += length;
+
+ if (NULL == fHead) {
+ fHead = SkBufferHead::Alloc(length);
+ fTail = &fHead->fBlock;
+ }
+
+ size_t written = fTail->append(src, length);
+ SkASSERT(written <= length);
+ src = (const char*)src + written;
+ length -= written;
+
+ if (length) {
+ SkBufferBlock* block = SkBufferBlock::Alloc(length);
+ fTail->fNext = block;
+ fTail = block;
+ written = fTail->append(src, length);
+ SkASSERT(written == length);
+ }
+ this->validate();
+}
+
+void* SkRWBuffer::append(size_t length) {
+ this->validate();
+ if (0 == length) {
+ return NULL;
+ }
+
+ fTotalUsed += length;
+
+ if (NULL == fHead) {
+ fHead = SkBufferHead::Alloc(length);
+ fTail = &fHead->fBlock;
+ } else if (fTail->avail() < length) {
+ SkBufferBlock* block = SkBufferBlock::Alloc(length);
+ fTail->fNext = block;
+ fTail = block;
+ }
+
+ fTail->fUsed += length;
+ this->validate();
+ return (char*)fTail->availData() - length;
+}
+
+#ifdef SK_DEBUG
+void SkRWBuffer::validate() const {
+ if (fHead) {
+ fHead->validate(fTotalUsed, fTail);
+ } else {
+ SkASSERT(NULL == fTail);
+ SkASSERT(0 == fTotalUsed);
+ }
+}
+#endif
+
+SkROBuffer* SkRWBuffer::newRBufferSnapshot() const {
+ return SkNEW_ARGS(SkROBuffer, (fHead, fTotalUsed));
+}
+
+///////////////////////////////////////////////////////////////////////////////////////////////////
+
+class SkROBufferStreamAsset : public SkStreamAsset {
+ void validate() const {
+#ifdef SK_DEBUG
+ SkASSERT(fGlobalOffset <= fBuffer->size());
+ SkASSERT(fLocalOffset <= fIter.size());
+ SkASSERT(fLocalOffset <= fGlobalOffset);
+#endif
+ }
+
+#ifdef SK_DEBUG
+ class AutoValidate {
+ SkROBufferStreamAsset* fStream;
+ public:
+ AutoValidate(SkROBufferStreamAsset* stream) : fStream(stream) { stream->validate(); }
+ ~AutoValidate() { fStream->validate(); }
+ };
+ #define AUTO_VALIDATE AutoValidate av(this);
+#else
+ #define AUTO_VALIDATE
+#endif
+
+public:
+ SkROBufferStreamAsset(const SkROBuffer* buffer) : fBuffer(SkRef(buffer)), fIter(buffer) {
+ fGlobalOffset = fLocalOffset = 0;
+ }
+
+ virtual ~SkROBufferStreamAsset() { fBuffer->unref(); }
+
+ size_t getLength() const override { return fBuffer->size(); }
+
+ bool rewind() override {
+ AUTO_VALIDATE
+ fIter.reset(fBuffer);
+ fGlobalOffset = fLocalOffset = 0;
+ return true;
+ }
+
+ size_t read(void* dst, size_t request) override {
+ AUTO_VALIDATE
+ size_t bytesRead = 0;
+ for (;;) {
+ size_t size = fIter.size();
+ SkASSERT(fLocalOffset <= size);
+ size_t avail = SkTMin(size - fLocalOffset, request - bytesRead);
+ if (dst) {
+ memcpy(dst, (const char*)fIter.data() + fLocalOffset, avail);
+ dst = (char*)dst + avail;
+ }
+ bytesRead += avail;
+ fLocalOffset += avail;
+ SkASSERT(bytesRead <= request);
+ if (bytesRead == request) {
+ break;
+ }
+ // If we get here, we've exhausted the current iter
+ SkASSERT(fLocalOffset == size);
+ fLocalOffset = 0;
+ if (!fIter.next()) {
+ break; // ran out of data
+ }
+ }
+ fGlobalOffset += bytesRead;
+ SkASSERT(fGlobalOffset <= fBuffer->size());
+ return bytesRead;
+ }
+
+ bool isAtEnd() const override {
+ return fBuffer->size() == fGlobalOffset;
+ }
+
+ SkStreamAsset* duplicate() const override {
+ return SkNEW_ARGS(SkROBufferStreamAsset, (fBuffer));
+ }
+
+ size_t getPosition() const {
+ return fGlobalOffset;
+ }
+
+ bool seek(size_t position) {
+ AUTO_VALIDATE
+ if (position < fGlobalOffset) {
+ this->rewind();
+ }
+ (void)this->skip(position - fGlobalOffset);
+ return true;
+ }
+
+ bool move(long offset) {
+ AUTO_VALIDATE
+ offset += fGlobalOffset;
+ if (offset <= 0) {
+ this->rewind();
+ } else {
+ (void)this->seek(SkToSizeT(offset));
+ }
+ return true;
+ }
+
+ SkStreamAsset* fork() const override {
+ SkStreamAsset* clone = this->duplicate();
+ clone->seek(this->getPosition());
+ return clone;
+ }
+
+
+private:
+ const SkROBuffer* fBuffer;
+ SkROBuffer::Iter fIter;
+ size_t fLocalOffset;
+ size_t fGlobalOffset;
+};
+
+SkStreamAsset* SkRWBuffer::newStreamSnapshot() const {
+ SkAutoTUnref<SkROBuffer> buffer(this->newRBufferSnapshot());
+ return SkNEW_ARGS(SkROBufferStreamAsset, (buffer));
+}
diff --git a/src/core/SkRWBuffer.h b/src/core/SkRWBuffer.h
new file mode 100644
index 0000000000..89cb425a7f
--- /dev/null
+++ b/src/core/SkRWBuffer.h
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2015 Google Inc.
+ *
+ * Use of this source code is governed by a BSD-style license that can be
+ * found in the LICENSE file.
+ */
+
+#ifndef SkRWBuffer_DEFINED
+#define SkRWBuffer_DEFINED
+
+#include "SkRefCnt.h"
+
+struct SkBufferBlock;
+struct SkBufferHead;
+class SkRWBuffer;
+class SkStreamAsset;
+
+/**
+ * Contains a read-only, thread-sharable block of memory. To access the memory, the caller must
+ * instantiate a local iterator, as the memory is stored in 1 or more contiguous blocks.
+ */
+class SkROBuffer : public SkRefCnt {
+public:
+ /**
+ * Return the logical length of the data owned/shared by this buffer. It may be stored in
+ * multiple contiguous blocks, accessible via the iterator.
+ */
+ size_t size() const { return fUsed; }
+
+ class Iter {
+ public:
+ Iter(const SkROBuffer*);
+
+ void reset(const SkROBuffer*);
+
+ /**
+ * Return the current continuous block of memory, or NULL if the iterator is exhausted
+ */
+ const void* data() const;
+
+ /**
+ * Returns the number of bytes in the current continguous block of memory, or 0 if the
+ * iterator is exhausted.
+ */
+ size_t size() const;
+
+ /**
+ * Advance to the next contiguous block of memory, returning true if there is another
+ * block, or false if the iterator is exhausted.
+ */
+ bool next();
+
+ private:
+ const SkBufferBlock* fBlock;
+ size_t fRemaining;
+ };
+
+private:
+ SkROBuffer(const SkBufferHead* head, size_t used);
+ virtual ~SkROBuffer();
+
+ const SkBufferHead* fHead;
+ const size_t fUsed;
+
+ friend class SkRWBuffer;
+};
+
+/**
+ * Accumulates bytes of memory that are "appended" to it, growing internal storage as needed.
+ * The growth is done such that at any time, a RBuffer or StreamAsset can be snapped off, which
+ * can see the previously stored bytes, but which will be unaware of any future writes.
+ */
+class SkRWBuffer {
+public:
+ SkRWBuffer(size_t initialCapacity = 0);
+ ~SkRWBuffer();
+
+ size_t size() const { return fTotalUsed; }
+ void append(const void* buffer, size_t length);
+ void* append(size_t length);
+
+ SkROBuffer* newRBufferSnapshot() const;
+ SkStreamAsset* newStreamSnapshot() const;
+
+#ifdef SK_DEBUG
+ void validate() const;
+#else
+ void validate() const {}
+#endif
+
+private:
+ SkBufferHead* fHead;
+ SkBufferBlock* fTail;
+ size_t fTotalUsed;
+};
+
+#endif
diff --git a/tests/DataRefTest.cpp b/tests/DataRefTest.cpp
index 099e9098cb..981ac54fcf 100644
--- a/tests/DataRefTest.cpp
+++ b/tests/DataRefTest.cpp
@@ -232,3 +232,84 @@ DEF_TEST(Data, reporter) {
test_cstring(reporter);
test_files(reporter);
}
+
+///////////////////////////////////////////////////////////////////////////////////////////////////
+#include "SkRWBuffer.h"
+
+const char gABC[] = "abcdefghijklmnopqrstuvwxyz";
+
+static void check_abcs(skiatest::Reporter* reporter, const char buffer[], size_t size) {
+ REPORTER_ASSERT(reporter, size % 26 == 0);
+ for (size_t offset = 0; offset < size; offset += 26) {
+ REPORTER_ASSERT(reporter, !memcmp(&buffer[offset], gABC, 26));
+ }
+}
+
+// stream should contains an integral number of copies of gABC.
+static void check_alphabet_stream(skiatest::Reporter* reporter, SkStream* stream) {
+ REPORTER_ASSERT(reporter, stream->hasLength());
+ size_t size = stream->getLength();
+ REPORTER_ASSERT(reporter, size % 26 == 0);
+
+ SkAutoTMalloc<char> storage(size);
+ char* array = storage.get();
+ size_t bytesRead = stream->read(array, size);
+ REPORTER_ASSERT(reporter, bytesRead == size);
+ check_abcs(reporter, array, size);
+
+ // try checking backwards
+ for (size_t offset = size; offset > 0; offset -= 26) {
+ REPORTER_ASSERT(reporter, stream->seek(offset - 26));
+ REPORTER_ASSERT(reporter, stream->getPosition() == offset - 26);
+ REPORTER_ASSERT(reporter, stream->read(array, 26) == 26);
+ check_abcs(reporter, array, 26);
+ REPORTER_ASSERT(reporter, stream->getPosition() == offset);
+ }
+}
+
+// reader should contains an integral number of copies of gABC.
+static void check_alphabet_buffer(skiatest::Reporter* reporter, const SkROBuffer* reader) {
+ size_t size = reader->size();
+ REPORTER_ASSERT(reporter, size % 26 == 0);
+
+ SkAutoTMalloc<char> storage(size);
+ SkROBuffer::Iter iter(reader);
+ size_t offset = 0;
+ do {
+ SkASSERT(offset + iter.size() <= size);
+ memcpy(storage.get() + offset, iter.data(), iter.size());
+ offset += iter.size();
+ } while (iter.next());
+ REPORTER_ASSERT(reporter, offset == size);
+ check_abcs(reporter, storage.get(), size);
+}
+
+DEF_TEST(RWBuffer, reporter) {
+ // Knowing that the default capacity is 4096, choose N large enough so we force it to use
+ // multiple buffers internally.
+ const int N = 1000;
+ SkROBuffer* readers[N];
+ SkStream* streams[N];
+
+ {
+ SkRWBuffer buffer;
+ for (int i = 0; i < N; ++i) {
+ if (0 == (i & 1)) {
+ buffer.append(gABC, 26);
+ } else {
+ memcpy(buffer.append(26), gABC, 26);
+ }
+ readers[i] = buffer.newRBufferSnapshot();
+ streams[i] = buffer.newStreamSnapshot();
+ }
+ REPORTER_ASSERT(reporter, N*26 == buffer.size());
+ }
+
+ for (int i = 0; i < N; ++i) {
+ REPORTER_ASSERT(reporter, (i + 1) * 26U == readers[i]->size());
+ check_alphabet_buffer(reporter, readers[i]);
+ check_alphabet_stream(reporter, streams[i]);
+ readers[i]->unref();
+ SkDELETE(streams[i]);
+ }
+}