aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/transport/byte_stream.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/transport/byte_stream.cc')
-rw-r--r--src/core/lib/transport/byte_stream.cc208
1 files changed, 92 insertions, 116 deletions
diff --git a/src/core/lib/transport/byte_stream.cc b/src/core/lib/transport/byte_stream.cc
index e1751f8010..cb15a71a91 100644
--- a/src/core/lib/transport/byte_stream.cc
+++ b/src/core/lib/transport/byte_stream.cc
@@ -25,160 +25,136 @@
#include <grpc/support/log.h>
+#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/slice/slice_internal.h"
-bool grpc_byte_stream_next(grpc_byte_stream* byte_stream, size_t max_size_hint,
- grpc_closure* on_complete) {
- return byte_stream->vtable->next(byte_stream, max_size_hint, on_complete);
-}
+namespace grpc_core {
-grpc_error* grpc_byte_stream_pull(grpc_byte_stream* byte_stream,
- grpc_slice* slice) {
- return byte_stream->vtable->pull(byte_stream, slice);
-}
+//
+// SliceBufferByteStream
+//
-void grpc_byte_stream_shutdown(grpc_byte_stream* byte_stream,
- grpc_error* error) {
- byte_stream->vtable->shutdown(byte_stream, error);
+SliceBufferByteStream::SliceBufferByteStream(grpc_slice_buffer* slice_buffer,
+ uint32_t flags)
+ : ByteStream(static_cast<uint32_t>(slice_buffer->length), flags) {
+ GPR_ASSERT(slice_buffer->length <= UINT32_MAX);
+ grpc_slice_buffer_init(&backing_buffer_);
+ grpc_slice_buffer_swap(slice_buffer, &backing_buffer_);
}
-void grpc_byte_stream_destroy(grpc_byte_stream* byte_stream) {
- byte_stream->vtable->destroy(byte_stream);
-}
+SliceBufferByteStream::~SliceBufferByteStream() {}
-// grpc_slice_buffer_stream
+void SliceBufferByteStream::Orphan() {
+ grpc_slice_buffer_destroy(&backing_buffer_);
+ GRPC_ERROR_UNREF(shutdown_error_);
+ // Note: We do not actually delete the object here, since
+ // SliceBufferByteStream is usually allocated as part of a larger
+ // object and has an OrphanablePtr of itself passed down through the
+ // filter stack.
+}
-static bool slice_buffer_stream_next(grpc_byte_stream* byte_stream,
- size_t max_size_hint,
- grpc_closure* on_complete) {
- grpc_slice_buffer_stream* stream =
- reinterpret_cast<grpc_slice_buffer_stream*>(byte_stream);
- GPR_ASSERT(stream->cursor < stream->backing_buffer.count);
+bool SliceBufferByteStream::Next(size_t max_size_hint,
+ grpc_closure* on_complete) {
+ GPR_ASSERT(cursor_ < backing_buffer_.count);
return true;
}
-static grpc_error* slice_buffer_stream_pull(grpc_byte_stream* byte_stream,
- grpc_slice* slice) {
- grpc_slice_buffer_stream* stream =
- reinterpret_cast<grpc_slice_buffer_stream*>(byte_stream);
- if (stream->shutdown_error != GRPC_ERROR_NONE) {
- return GRPC_ERROR_REF(stream->shutdown_error);
+grpc_error* SliceBufferByteStream::Pull(grpc_slice* slice) {
+ if (shutdown_error_ != GRPC_ERROR_NONE) {
+ return GRPC_ERROR_REF(shutdown_error_);
}
- GPR_ASSERT(stream->cursor < stream->backing_buffer.count);
- *slice =
- grpc_slice_ref_internal(stream->backing_buffer.slices[stream->cursor]);
- stream->cursor++;
+ GPR_ASSERT(cursor_ < backing_buffer_.count);
+ *slice = grpc_slice_ref_internal(backing_buffer_.slices[cursor_]);
+ ++cursor_;
return GRPC_ERROR_NONE;
}
-static void slice_buffer_stream_shutdown(grpc_byte_stream* byte_stream,
- grpc_error* error) {
- grpc_slice_buffer_stream* stream =
- reinterpret_cast<grpc_slice_buffer_stream*>(byte_stream);
- GRPC_ERROR_UNREF(stream->shutdown_error);
- stream->shutdown_error = error;
+void SliceBufferByteStream::Shutdown(grpc_error* error) {
+ GRPC_ERROR_UNREF(shutdown_error_);
+ shutdown_error_ = error;
}
-static void slice_buffer_stream_destroy(grpc_byte_stream* byte_stream) {
- grpc_slice_buffer_stream* stream =
- reinterpret_cast<grpc_slice_buffer_stream*>(byte_stream);
- grpc_slice_buffer_destroy(&stream->backing_buffer);
- GRPC_ERROR_UNREF(stream->shutdown_error);
+//
+// ByteStreamCache
+//
+
+ByteStreamCache::ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream)
+ : underlying_stream_(std::move(underlying_stream)),
+ length_(underlying_stream_->length()),
+ flags_(underlying_stream_->flags()) {
+ grpc_slice_buffer_init(&cache_buffer_);
}
-static const grpc_byte_stream_vtable slice_buffer_stream_vtable = {
- slice_buffer_stream_next, slice_buffer_stream_pull,
- slice_buffer_stream_shutdown, slice_buffer_stream_destroy};
+ByteStreamCache::~ByteStreamCache() { Destroy(); }
-void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream* stream,
- grpc_slice_buffer* slice_buffer,
- uint32_t flags) {
- GPR_ASSERT(slice_buffer->length <= UINT32_MAX);
- stream->base.length = static_cast<uint32_t>(slice_buffer->length);
- stream->base.flags = flags;
- stream->base.vtable = &slice_buffer_stream_vtable;
- grpc_slice_buffer_init(&stream->backing_buffer);
- grpc_slice_buffer_swap(slice_buffer, &stream->backing_buffer);
- stream->cursor = 0;
- stream->shutdown_error = GRPC_ERROR_NONE;
+void ByteStreamCache::Destroy() {
+ underlying_stream_.reset();
+ if (cache_buffer_.length > 0) {
+ grpc_slice_buffer_destroy_internal(&cache_buffer_);
+ }
}
-// grpc_caching_byte_stream
+//
+// ByteStreamCache::CachingByteStream
+//
-void grpc_byte_stream_cache_init(grpc_byte_stream_cache* cache,
- grpc_byte_stream* underlying_stream) {
- cache->underlying_stream = underlying_stream;
- grpc_slice_buffer_init(&cache->cache_buffer);
-}
+ByteStreamCache::CachingByteStream::CachingByteStream(ByteStreamCache* cache)
+ : ByteStream(cache->length_, cache->flags_), cache_(cache) {}
-void grpc_byte_stream_cache_destroy(grpc_byte_stream_cache* cache) {
- grpc_byte_stream_destroy(cache->underlying_stream);
- grpc_slice_buffer_destroy_internal(&cache->cache_buffer);
+ByteStreamCache::CachingByteStream::~CachingByteStream() {}
+
+void ByteStreamCache::CachingByteStream::Orphan() {
+ GRPC_ERROR_UNREF(shutdown_error_);
+ // Note: We do not actually delete the object here, since
+ // CachingByteStream is usually allocated as part of a larger
+ // object and has an OrphanablePtr of itself passed down through the
+ // filter stack.
}
-static bool caching_byte_stream_next(grpc_byte_stream* byte_stream,
- size_t max_size_hint,
- grpc_closure* on_complete) {
- grpc_caching_byte_stream* stream =
- reinterpret_cast<grpc_caching_byte_stream*>(byte_stream);
- if (stream->shutdown_error != GRPC_ERROR_NONE) return true;
- if (stream->cursor < stream->cache->cache_buffer.count) return true;
- return grpc_byte_stream_next(stream->cache->underlying_stream, max_size_hint,
- on_complete);
+bool ByteStreamCache::CachingByteStream::Next(size_t max_size_hint,
+ grpc_closure* on_complete) {
+ if (shutdown_error_ != GRPC_ERROR_NONE) return true;
+ if (cursor_ < cache_->cache_buffer_.count) return true;
+ GPR_ASSERT(cache_->underlying_stream_ != nullptr);
+ return cache_->underlying_stream_->Next(max_size_hint, on_complete);
}
-static grpc_error* caching_byte_stream_pull(grpc_byte_stream* byte_stream,
- grpc_slice* slice) {
- grpc_caching_byte_stream* stream =
- reinterpret_cast<grpc_caching_byte_stream*>(byte_stream);
- if (stream->shutdown_error != GRPC_ERROR_NONE) {
- return GRPC_ERROR_REF(stream->shutdown_error);
+grpc_error* ByteStreamCache::CachingByteStream::Pull(grpc_slice* slice) {
+ if (shutdown_error_ != GRPC_ERROR_NONE) {
+ return GRPC_ERROR_REF(shutdown_error_);
}
- if (stream->cursor < stream->cache->cache_buffer.count) {
- *slice = grpc_slice_ref_internal(
- stream->cache->cache_buffer.slices[stream->cursor]);
- ++stream->cursor;
+ if (cursor_ < cache_->cache_buffer_.count) {
+ *slice = grpc_slice_ref_internal(cache_->cache_buffer_.slices[cursor_]);
+ ++cursor_;
+ offset_ += GRPC_SLICE_LENGTH(*slice);
return GRPC_ERROR_NONE;
}
- grpc_error* error =
- grpc_byte_stream_pull(stream->cache->underlying_stream, slice);
+ GPR_ASSERT(cache_->underlying_stream_ != nullptr);
+ grpc_error* error = cache_->underlying_stream_->Pull(slice);
if (error == GRPC_ERROR_NONE) {
- ++stream->cursor;
- grpc_slice_buffer_add(&stream->cache->cache_buffer,
+ grpc_slice_buffer_add(&cache_->cache_buffer_,
grpc_slice_ref_internal(*slice));
+ ++cursor_;
+ offset_ += GRPC_SLICE_LENGTH(*slice);
+ // Orphan the underlying stream if it's been drained.
+ if (offset_ == cache_->underlying_stream_->length()) {
+ cache_->underlying_stream_.reset();
+ }
}
return error;
}
-static void caching_byte_stream_shutdown(grpc_byte_stream* byte_stream,
- grpc_error* error) {
- grpc_caching_byte_stream* stream =
- reinterpret_cast<grpc_caching_byte_stream*>(byte_stream);
- GRPC_ERROR_UNREF(stream->shutdown_error);
- stream->shutdown_error = GRPC_ERROR_REF(error);
- grpc_byte_stream_shutdown(stream->cache->underlying_stream, error);
-}
-
-static void caching_byte_stream_destroy(grpc_byte_stream* byte_stream) {
- grpc_caching_byte_stream* stream =
- reinterpret_cast<grpc_caching_byte_stream*>(byte_stream);
- GRPC_ERROR_UNREF(stream->shutdown_error);
+void ByteStreamCache::CachingByteStream::Shutdown(grpc_error* error) {
+ GRPC_ERROR_UNREF(shutdown_error_);
+ shutdown_error_ = GRPC_ERROR_REF(error);
+ if (cache_->underlying_stream_ != nullptr) {
+ cache_->underlying_stream_->Shutdown(error);
+ }
}
-static const grpc_byte_stream_vtable caching_byte_stream_vtable = {
- caching_byte_stream_next, caching_byte_stream_pull,
- caching_byte_stream_shutdown, caching_byte_stream_destroy};
-
-void grpc_caching_byte_stream_init(grpc_caching_byte_stream* stream,
- grpc_byte_stream_cache* cache) {
- memset(stream, 0, sizeof(*stream));
- stream->base.length = cache->underlying_stream->length;
- stream->base.flags = cache->underlying_stream->flags;
- stream->base.vtable = &caching_byte_stream_vtable;
- stream->cache = cache;
- stream->shutdown_error = GRPC_ERROR_NONE;
+void ByteStreamCache::CachingByteStream::Reset() {
+ cursor_ = 0;
+ offset_ = 0;
}
-void grpc_caching_byte_stream_reset(grpc_caching_byte_stream* stream) {
- stream->cursor = 0;
-}
+} // namespace grpc_core