diff options
Diffstat (limited to 'src/core/transport/chttp2/stream_encoder.c')
-rw-r--r-- | src/core/transport/chttp2/stream_encoder.c | 130 |
1 files changed, 84 insertions, 46 deletions
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index 8595a59879..92a36d0c16 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -68,8 +68,6 @@ typedef struct { gpr_uint8 last_was_header; /* output stream id */ gpr_uint32 stream_id; - /* number of flow controlled bytes written */ - gpr_uint32 output_size; gpr_slice_buffer *output; } framer_state; @@ -464,49 +462,31 @@ void grpc_chttp2_hpack_compressor_destroy(grpc_chttp2_hpack_compressor *c) { grpc_mdstr_unref(c->timeout_key_str); } -gpr_uint32 grpc_chttp2_encode_some(grpc_stream_op *ops, size_t *ops_count, - int eof, gpr_slice_buffer *output, - gpr_uint32 max_bytes, gpr_uint32 stream_id, - grpc_chttp2_hpack_compressor *compressor) { - framer_state st; +gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, + gpr_uint32 max_flow_controlled_bytes, + grpc_stream_op_buffer *outops) { gpr_slice slice; grpc_stream_op *op; gpr_uint32 max_take_size; + gpr_uint32 flow_controlled_bytes_taken = 0; gpr_uint32 curop = 0; - gpr_uint32 nops = *ops_count; gpr_uint8 *p; - GPR_ASSERT(stream_id != 0); - - st.cur_frame_type = NONE; - st.last_was_header = 0; - st.stream_id = stream_id; - st.output = output; - st.output_size = 0; - - while (curop < nops) { - GPR_ASSERT(st.output_size <= max_bytes); - op = &ops[curop]; + while (curop < *inops_count) { + GPR_ASSERT(flow_controlled_bytes_taken <= max_flow_controlled_bytes); + op = &inops[curop]; switch (op->type) { case GRPC_NO_OP: + /* skip */ curop++; break; case GRPC_OP_FLOW_CTL_CB: - op->data.flow_ctl_cb.cb(op->data.flow_ctl_cb.arg, GRPC_OP_OK); - curop++; - break; - case GRPC_OP_METADATA: - hpack_enc(compressor, op->data.metadata, &st); - curop++; - break; case GRPC_OP_DEADLINE: - deadline_enc(compressor, op->data.deadline, &st); - curop++; - break; + case GRPC_OP_METADATA: case GRPC_OP_METADATA_BOUNDARY: - ensure_frame_type(&st, HEADER, 0); - finish_frame(&st, 1, 0); - st.last_was_header = 0; /* force a new header frame */ + /* these just get copied as they don't impact the number of flow + controlled bytes */ + grpc_sopb_append(outops, op, 1); curop++; break; case GRPC_OP_BEGIN_MESSAGE: @@ -525,42 +505,100 @@ gpr_uint32 grpc_chttp2_encode_some(grpc_stream_op *ops, size_t *ops_count, case GRPC_OP_SLICE: slice = op->data.slice; if (!GPR_SLICE_LENGTH(slice)) { + /* skip zero length slices */ + gpr_slice_unref(slice); curop++; break; } - if (st.output_size == max_bytes) { + max_take_size = max_flow_controlled_bytes - flow_controlled_bytes_taken; + if (max_take_size == 0) { goto exit_loop; } + if (GPR_SLICE_LENGTH(slice) > max_take_size) { + slice = gpr_slice_split_head(&op->data.slice, max_take_size); + grpc_sopb_add_slice(outops, slice); + } else { + /* consume this op immediately */ + grpc_sopb_append(outops, op, 1); + curop++; + } + flow_controlled_bytes_taken += GPR_SLICE_LENGTH(slice); + break; + } + } +exit_loop: + *inops_count -= curop; + memmove(inops, inops + curop, *inops_count * sizeof(grpc_stream_op)); + + return flow_controlled_bytes_taken; +} + +void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, + gpr_uint32 stream_id, + grpc_chttp2_hpack_compressor *compressor, + gpr_slice_buffer *output) { + framer_state st; + gpr_slice slice; + grpc_stream_op *op; + gpr_uint32 max_take_size; + gpr_uint32 curop = 0; + + GPR_ASSERT(stream_id != 0); + + st.cur_frame_type = NONE; + st.last_was_header = 0; + st.stream_id = stream_id; + st.output = output; + + while (curop < ops_count) { + op = &ops[curop]; + switch (op->type) { + case GRPC_NO_OP: + case GRPC_OP_BEGIN_MESSAGE: + gpr_log( + GPR_ERROR, + "These stream ops should be filtered out by grpc_chttp2_preencode"); + abort(); + case GRPC_OP_FLOW_CTL_CB: + op->data.flow_ctl_cb.cb(op->data.flow_ctl_cb.arg, GRPC_OP_OK); + curop++; + break; + case GRPC_OP_METADATA: + hpack_enc(compressor, op->data.metadata, &st); + curop++; + break; + case GRPC_OP_DEADLINE: + deadline_enc(compressor, op->data.deadline, &st); + curop++; + break; + case GRPC_OP_METADATA_BOUNDARY: + ensure_frame_type(&st, HEADER, 0); + finish_frame(&st, 1, 0); + st.last_was_header = 0; /* force a new header frame */ + curop++; + break; + case GRPC_OP_SLICE: + slice = op->data.slice; if (st.cur_frame_type == DATA && st.output->length - st.output_length_at_start_of_frame == GRPC_CHTTP2_MAX_PAYLOAD_LENGTH) { finish_frame(&st, 0, 0); } ensure_frame_type(&st, DATA, 1); - max_take_size = - GPR_MIN(max_bytes - st.output_size, - GRPC_CHTTP2_MAX_PAYLOAD_LENGTH + - st.output_length_at_start_of_frame - st.output->length); + max_take_size = GRPC_CHTTP2_MAX_PAYLOAD_LENGTH + + st.output_length_at_start_of_frame - st.output->length; if (GPR_SLICE_LENGTH(slice) > max_take_size) { slice = gpr_slice_split_head(&op->data.slice, max_take_size); } else { /* consume this op immediately */ curop++; } - st.output_size += GPR_SLICE_LENGTH(slice); gpr_slice_buffer_add(output, slice); break; } } -exit_loop: if (eof && st.cur_frame_type == NONE) { begin_frame(&st, DATA); } - finish_frame(&st, 1, eof && curop == nops); - - nops -= curop; - *ops_count = nops; - memmove(ops, ops + curop, nops * sizeof(grpc_stream_op)); - - return st.output_size; + finish_frame(&st, 1, eof); } |