aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/transport/byte_stream.h
blob: d3e04df5c06b13d993787f36ecec0a2da0116374 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/*
 *
 * Copyright 2015 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#ifndef GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H
#define GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H

#include <grpc/slice_buffer.h>
#include "src/core/lib/iomgr/exec_ctx.h"

/** Internal bit flag for grpc_begin_message's \a flags signaling the use of
 * compression for the message */
#define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u)
/** Mask of all valid internal flags. */
#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS)

#ifdef __cplusplus
extern "C" {
#endif

typedef struct grpc_byte_stream grpc_byte_stream;

typedef struct {
  bool (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
               size_t max_size_hint, grpc_closure *on_complete);
  grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
                      grpc_slice *slice);
  void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
                   grpc_error *error);
  void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream);
} grpc_byte_stream_vtable;

struct grpc_byte_stream {
  uint32_t length;
  uint32_t flags;
  const grpc_byte_stream_vtable *vtable;
};

// Returns true if the bytes are available immediately (in which case
// on_complete will not be called), false if the bytes will be available
// asynchronously.
//
// max_size_hint can be set as a hint as to the maximum number
// of bytes that would be acceptable to read.
bool grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
                           grpc_byte_stream *byte_stream, size_t max_size_hint,
                           grpc_closure *on_complete);

// Returns the next slice in the byte stream when it is ready (indicated by
// either grpc_byte_stream_next returning true or on_complete passed to
// grpc_byte_stream_next is called).
//
// Once a slice is returned into *slice, it is owned by the caller.
grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
                                  grpc_byte_stream *byte_stream,
                                  grpc_slice *slice);

// Shuts down the byte stream.
//
// If there is a pending call to on_complete from grpc_byte_stream_next(),
// it will be invoked with the error passed to grpc_byte_stream_shutdown().
//
// The next call to grpc_byte_stream_pull() (if any) will return the error
// passed to grpc_byte_stream_shutdown().
void grpc_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
                               grpc_byte_stream *byte_stream,
                               grpc_error *error);

void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
                              grpc_byte_stream *byte_stream);

// grpc_slice_buffer_stream
//
// A grpc_byte_stream that wraps a slice buffer.  The stream takes
// ownership of the slices in the buffer, and on destruction will
// reset the contents of the buffer.

typedef struct grpc_slice_buffer_stream {
  grpc_byte_stream base;
  grpc_slice_buffer *backing_buffer;
  size_t cursor;
  grpc_error *shutdown_error;
} grpc_slice_buffer_stream;

void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
                                   grpc_slice_buffer *slice_buffer,
                                   uint32_t flags);

// grpc_caching_byte_stream
//
// A grpc_byte_stream that that wraps an underlying byte stream but caches
// the resulting slices in a slice buffer.  If an initial attempt fails
// without fully draining the underlying stream, a new caching stream
// can be created from the same underlying cache, in which case it will
// return whatever is in the backing buffer before continuing to read the
// underlying stream.
//
// NOTE: No synchronization is done, so it is not safe to have multiple
// grpc_caching_byte_streams simultaneously drawing from the same underlying
// grpc_byte_stream_cache at the same time.

typedef struct {
  grpc_byte_stream *underlying_stream;
  grpc_slice_buffer cache_buffer;
} grpc_byte_stream_cache;

// Takes ownership of underlying_stream.
void grpc_byte_stream_cache_init(grpc_byte_stream_cache *cache,
                                 grpc_byte_stream *underlying_stream);

// Must not be called while still in use by a grpc_caching_byte_stream.
void grpc_byte_stream_cache_destroy(grpc_exec_ctx *exec_ctx,
                                    grpc_byte_stream_cache *cache);

typedef struct {
  grpc_byte_stream base;
  grpc_byte_stream_cache *cache;
  size_t cursor;
  grpc_error *shutdown_error;
} grpc_caching_byte_stream;

void grpc_caching_byte_stream_init(grpc_caching_byte_stream *stream,
                                   grpc_byte_stream_cache *cache);

// Resets the byte stream to the start of the underlying stream.
void grpc_caching_byte_stream_reset(grpc_caching_byte_stream *stream);

#ifdef __cplusplus
}
#endif

#endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */