aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/chttp2/frame_data.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/transport/chttp2/frame_data.c')
-rw-r--r--src/core/transport/chttp2/frame_data.c99
1 files changed, 86 insertions, 13 deletions
diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c
index 07179a4571..e07fbb2cc7 100644
--- a/src/core/transport/chttp2/frame_data.c
+++ b/src/core/transport/chttp2/frame_data.c
@@ -45,12 +45,20 @@
grpc_chttp2_parse_error grpc_chttp2_data_parser_init(
grpc_chttp2_data_parser *parser) {
parser->state = GRPC_CHTTP2_DATA_FH_0;
- grpc_sopb_init(&parser->incoming_sopb);
+ parser->parsing_frame = NULL;
return GRPC_CHTTP2_PARSE_OK;
}
-void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser *parser) {
- grpc_sopb_destroy(&parser->incoming_sopb);
+void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_data_parser *parser) {
+ grpc_byte_stream *bs;
+ if (parser->parsing_frame) {
+ grpc_chttp2_incoming_byte_stream_finished(exec_ctx, parser->parsing_frame);
+ }
+ while (
+ (bs = grpc_chttp2_incoming_frame_queue_pop(&parser->incoming_frames))) {
+ grpc_byte_stream_destroy(bs);
+ }
}
grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
@@ -69,6 +77,62 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
return GRPC_CHTTP2_PARSE_OK;
}
+void grpc_chttp2_incoming_frame_queue_merge(
+ grpc_chttp2_incoming_frame_queue *head_dst,
+ grpc_chttp2_incoming_frame_queue *tail_src) {
+ if (tail_src->head == NULL) {
+ return;
+ }
+
+ if (head_dst->head == NULL) {
+ *head_dst = *tail_src;
+ memset(tail_src, 0, sizeof(*tail_src));
+ return;
+ }
+
+ head_dst->tail->next_message = tail_src->head;
+ head_dst->tail = tail_src->tail;
+ memset(tail_src, 0, sizeof(*tail_src));
+}
+
+grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
+ grpc_chttp2_incoming_frame_queue *q) {
+ grpc_byte_stream *out;
+ if (q->head == NULL) {
+ return NULL;
+ }
+ out = &q->head->base;
+ if (q->head == q->tail) {
+ memset(q, 0, sizeof(*q));
+ } else {
+ q->head = q->head->next_message;
+ }
+ return out;
+}
+
+void grpc_chttp2_encode_data(gpr_uint32 id, gpr_slice_buffer *inbuf,
+ gpr_uint32 write_bytes, int is_eof,
+ gpr_slice_buffer *outbuf) {
+ gpr_slice hdr;
+ gpr_uint8 *p;
+
+ hdr = gpr_slice_malloc(9);
+ p = GPR_SLICE_START_PTR(hdr);
+ GPR_ASSERT(write_bytes < 16777316);
+ *p++ = (gpr_uint8)(write_bytes >> 16);
+ *p++ = (gpr_uint8)(write_bytes >> 8);
+ *p++ = (gpr_uint8)(write_bytes);
+ *p++ = GRPC_CHTTP2_FRAME_DATA;
+ *p++ = is_eof ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0;
+ *p++ = (gpr_uint8)(id >> 24);
+ *p++ = (gpr_uint8)(id >> 16);
+ *p++ = (gpr_uint8)(id >> 8);
+ *p++ = (gpr_uint8)(id);
+ gpr_slice_buffer_add(outbuf, hdr);
+
+ gpr_slice_buffer_move_first(inbuf, write_bytes, outbuf);
+}
+
grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
@@ -77,7 +141,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;
grpc_chttp2_data_parser *p = parser;
- gpr_uint32 message_flags = 0;
+ gpr_uint32 message_flags;
+ grpc_chttp2_incoming_byte_stream *incoming_byte_stream;
if (is_last && p->is_last_frame) {
stream_parsing->received_close = 1;
@@ -132,11 +197,14 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
p->frame_size |= ((gpr_uint32)*cur);
p->state = GRPC_CHTTP2_DATA_FRAME;
++cur;
+ message_flags = 0;
if (p->is_frame_compressed) {
message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
- grpc_sopb_add_begin_message(&p->incoming_sopb, p->frame_size,
- message_flags);
+ p->parsing_frame = incoming_byte_stream =
+ grpc_chttp2_incoming_byte_stream_create(
+ exec_ctx, transport_parsing, stream_parsing, p->frame_size,
+ message_flags, &p->incoming_frames);
/* fallthrough */
case GRPC_CHTTP2_DATA_FRAME:
if (cur == end) {
@@ -147,20 +215,25 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
stream_parsing);
if ((gpr_uint32)(end - cur) == p->frame_size) {
- grpc_sopb_add_slice(
- &p->incoming_sopb,
+ grpc_chttp2_incoming_byte_stream_push(
+ exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
+ grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame);
+ p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
return GRPC_CHTTP2_PARSE_OK;
} else if ((gpr_uint32)(end - cur) > p->frame_size) {
- grpc_sopb_add_slice(&p->incoming_sopb,
- gpr_slice_sub(slice, (size_t)(cur - beg),
- (size_t)(cur + p->frame_size - beg)));
+ grpc_chttp2_incoming_byte_stream_push(
+ exec_ctx, p->parsing_frame,
+ gpr_slice_sub(slice, (size_t)(cur - beg),
+ (size_t)(cur + p->frame_size - beg)));
+ grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame);
+ p->parsing_frame = NULL;
cur += p->frame_size;
goto fh_0; /* loop */
} else {
- grpc_sopb_add_slice(
- &p->incoming_sopb,
+ grpc_chttp2_incoming_byte_stream_push(
+ exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
GPR_ASSERT((size_t)(end - cur) <= p->frame_size);
p->frame_size -= (gpr_uint32)(end - cur);