/* * * Copyright 2015 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #ifndef GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H #define GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H #include #include #include "src/core/lib/gprpp/abstract.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/iomgr/closure.h" /** Internal bit flag for grpc_begin_message's \a flags signaling the use of * compression for the message */ #define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u) /** Mask of all valid internal flags. */ #define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS) namespace grpc_core { class ByteStream : public Orphanable { public: virtual ~ByteStream() {} // Returns true if the bytes are available immediately (in which case // on_complete will not be called), or false if the bytes will be available // asynchronously (in which case on_complete will be called when they // are available). // // max_size_hint can be set as a hint as to the maximum number // of bytes that would be acceptable to read. virtual bool Next(size_t max_size_hint, grpc_closure* on_complete) GRPC_ABSTRACT; // Returns the next slice in the byte stream when it is available, as // indicated by Next(). // // Once a slice is returned into *slice, it is owned by the caller. virtual grpc_error* Pull(grpc_slice* slice) GRPC_ABSTRACT; // Shuts down the byte stream. // // If there is a pending call to on_complete from Next(), it will be // invoked with the error passed to Shutdown(). // // The next call to Pull() (if any) will return the error passed to // Shutdown(). virtual void Shutdown(grpc_error* error) GRPC_ABSTRACT; uint32_t length() const { return length_; } uint32_t flags() const { return flags_; } void set_flags(uint32_t flags) { flags_ = flags; } GRPC_ABSTRACT_BASE_CLASS protected: ByteStream(uint32_t length, uint32_t flags) : length_(length), flags_(flags) {} private: const uint32_t length_; uint32_t flags_; }; // // SliceBufferByteStream // // A ByteStream that wraps a slice buffer. // class SliceBufferByteStream : public ByteStream { public: // Removes all slices in slice_buffer, leaving it empty. SliceBufferByteStream(grpc_slice_buffer* slice_buffer, uint32_t flags); ~SliceBufferByteStream(); void Orphan() override; bool Next(size_t max_size_hint, grpc_closure* on_complete) override; grpc_error* Pull(grpc_slice* slice) override; void Shutdown(grpc_error* error) override; private: grpc_slice_buffer backing_buffer_; size_t cursor_ = 0; grpc_error* shutdown_error_ = GRPC_ERROR_NONE; }; // // CachingByteStream // // A ByteStream that that wraps an underlying byte stream but caches // the resulting slices in a slice buffer. If an initial attempt fails // without fully draining the underlying stream, a new caching stream // can be created from the same underlying cache, in which case it will // return whatever is in the backing buffer before continuing to read the // underlying stream. // // NOTE: No synchronization is done, so it is not safe to have multiple // CachingByteStreams simultaneously drawing from the same underlying // ByteStreamCache at the same time. // class ByteStreamCache { public: class CachingByteStream : public ByteStream { public: explicit CachingByteStream(ByteStreamCache* cache); ~CachingByteStream(); void Orphan() override; bool Next(size_t max_size_hint, grpc_closure* on_complete) override; grpc_error* Pull(grpc_slice* slice) override; void Shutdown(grpc_error* error) override; // Resets the byte stream to the start of the underlying stream. void Reset(); private: ByteStreamCache* cache_; size_t cursor_ = 0; size_t offset_ = 0; grpc_error* shutdown_error_ = GRPC_ERROR_NONE; }; explicit ByteStreamCache(OrphanablePtr underlying_stream); ~ByteStreamCache(); // Must not be destroyed while still in use by a CachingByteStream. void Destroy(); grpc_slice_buffer* cache_buffer() { return &cache_buffer_; } private: OrphanablePtr underlying_stream_; uint32_t length_; uint32_t flags_; grpc_slice_buffer cache_buffer_; }; } // namespace grpc_core #endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */