aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/transport/byte_stream.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/transport/byte_stream.h')
-rw-r--r--src/core/lib/transport/byte_stream.h99
1 files changed, 78 insertions, 21 deletions
diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h
index f172296e4b..1e1e8310b8 100644
--- a/src/core/lib/transport/byte_stream.h
+++ b/src/core/lib/transport/byte_stream.h
@@ -28,52 +28,109 @@
/** Mask of all valid internal flags. */
#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS)
-struct grpc_byte_stream;
typedef struct grpc_byte_stream grpc_byte_stream;
-struct grpc_byte_stream {
- uint32_t length;
- uint32_t flags;
+typedef struct {
bool (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
size_t max_size_hint, grpc_closure *on_complete);
grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
grpc_slice *slice);
+ void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
+ grpc_error *error);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream);
+} grpc_byte_stream_vtable;
+
+struct grpc_byte_stream {
+ uint32_t length;
+ uint32_t flags;
+ const grpc_byte_stream_vtable *vtable;
};
-/* returns 1 if the bytes are available immediately (in which case
- * on_complete will not be called), 0 if the bytes will be available
- * asynchronously.
- *
- * max_size_hint can be set as a hint as to the maximum number
- * of bytes that would be acceptable to read.
- */
-int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
- grpc_byte_stream *byte_stream, size_t max_size_hint,
- grpc_closure *on_complete);
+// Returns true if the bytes are available immediately (in which case
+// on_complete will not be called), false if the bytes will be available
+// asynchronously.
+//
+// max_size_hint can be set as a hint as to the maximum number
+// of bytes that would be acceptable to read.
+bool grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream, size_t max_size_hint,
+ grpc_closure *on_complete);
-/* returns the next slice in the byte stream when it is ready (indicated by
- * either grpc_byte_stream_next returning 1 or on_complete passed to
- * grpc_byte_stream_next is called).
- *
- * once a slice is returned into *slice, it is owned by the caller.
- */
+// Returns the next slice in the byte stream when it is ready (indicated by
+// either grpc_byte_stream_next returning true or on_complete passed to
+// grpc_byte_stream_next is called).
+//
+// Once a slice is returned into *slice, it is owned by the caller.
grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_slice *slice);
+// Shuts down the byte stream.
+//
+// If there is a pending call to on_complete from grpc_byte_stream_next(),
+// it will be invoked with the error passed to grpc_byte_stream_shutdown().
+//
+// The next call to grpc_byte_stream_pull() (if any) will return the error
+// passed to grpc_byte_stream_shutdown().
+void grpc_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream *byte_stream,
+ grpc_error *error);
+
void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream);
-/* grpc_byte_stream that wraps a slice buffer */
+// grpc_slice_buffer_stream
+//
+// A grpc_byte_stream that wraps a slice buffer.
+
typedef struct grpc_slice_buffer_stream {
grpc_byte_stream base;
grpc_slice_buffer *backing_buffer;
size_t cursor;
+ grpc_error *shutdown_error;
} grpc_slice_buffer_stream;
void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
grpc_slice_buffer *slice_buffer,
uint32_t flags);
+// grpc_caching_byte_stream
+//
+// A grpc_byte_stream that that wraps an underlying byte stream but caches
+// the resulting slices in a slice buffer. If an initial attempt fails
+// without fully draining the underlying stream, a new caching stream
+// can be created from the same underlying cache, in which case it will
+// return whatever is in the backing buffer before continuing to read the
+// underlying stream.
+//
+// NOTE: No synchronization is done, so it is not safe to have multiple
+// grpc_caching_byte_streams simultaneously drawing from the same underlying
+// grpc_byte_stream_cache at the same time.
+
+typedef struct {
+ grpc_byte_stream *underlying_stream;
+ grpc_slice_buffer cache_buffer;
+} grpc_byte_stream_cache;
+
+// Takes ownership of underlying_stream.
+void grpc_byte_stream_cache_init(grpc_byte_stream_cache *cache,
+ grpc_byte_stream *underlying_stream);
+
+// Must not be called while still in use by a grpc_caching_byte_stream.
+void grpc_byte_stream_cache_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_byte_stream_cache *cache);
+
+typedef struct {
+ grpc_byte_stream base;
+ grpc_byte_stream_cache *cache;
+ size_t cursor;
+ grpc_error *shutdown_error;
+} grpc_caching_byte_stream;
+
+void grpc_caching_byte_stream_init(grpc_caching_byte_stream *stream,
+ grpc_byte_stream_cache *cache);
+
+// Resets the byte stream to the start of the underlying stream.
+void grpc_caching_byte_stream_reset(grpc_caching_byte_stream *stream);
+
#endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */