aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/chttp2/stream_encoder.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/transport/chttp2/stream_encoder.c')
-rw-r--r--src/core/transport/chttp2/stream_encoder.c130
1 files changed, 84 insertions, 46 deletions
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index 8595a59879..92a36d0c16 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -68,8 +68,6 @@ typedef struct {
gpr_uint8 last_was_header;
/* output stream id */
gpr_uint32 stream_id;
- /* number of flow controlled bytes written */
- gpr_uint32 output_size;
gpr_slice_buffer *output;
} framer_state;
@@ -464,49 +462,31 @@ void grpc_chttp2_hpack_compressor_destroy(grpc_chttp2_hpack_compressor *c) {
grpc_mdstr_unref(c->timeout_key_str);
}
-gpr_uint32 grpc_chttp2_encode_some(grpc_stream_op *ops, size_t *ops_count,
- int eof, gpr_slice_buffer *output,
- gpr_uint32 max_bytes, gpr_uint32 stream_id,
- grpc_chttp2_hpack_compressor *compressor) {
- framer_state st;
+gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
+ gpr_uint32 max_flow_controlled_bytes,
+ grpc_stream_op_buffer *outops) {
gpr_slice slice;
grpc_stream_op *op;
gpr_uint32 max_take_size;
+ gpr_uint32 flow_controlled_bytes_taken = 0;
gpr_uint32 curop = 0;
- gpr_uint32 nops = *ops_count;
gpr_uint8 *p;
- GPR_ASSERT(stream_id != 0);
-
- st.cur_frame_type = NONE;
- st.last_was_header = 0;
- st.stream_id = stream_id;
- st.output = output;
- st.output_size = 0;
-
- while (curop < nops) {
- GPR_ASSERT(st.output_size <= max_bytes);
- op = &ops[curop];
+ while (curop < *inops_count) {
+ GPR_ASSERT(flow_controlled_bytes_taken <= max_flow_controlled_bytes);
+ op = &inops[curop];
switch (op->type) {
case GRPC_NO_OP:
+ /* skip */
curop++;
break;
case GRPC_OP_FLOW_CTL_CB:
- op->data.flow_ctl_cb.cb(op->data.flow_ctl_cb.arg, GRPC_OP_OK);
- curop++;
- break;
- case GRPC_OP_METADATA:
- hpack_enc(compressor, op->data.metadata, &st);
- curop++;
- break;
case GRPC_OP_DEADLINE:
- deadline_enc(compressor, op->data.deadline, &st);
- curop++;
- break;
+ case GRPC_OP_METADATA:
case GRPC_OP_METADATA_BOUNDARY:
- ensure_frame_type(&st, HEADER, 0);
- finish_frame(&st, 1, 0);
- st.last_was_header = 0; /* force a new header frame */
+ /* these just get copied as they don't impact the number of flow
+ controlled bytes */
+ grpc_sopb_append(outops, op, 1);
curop++;
break;
case GRPC_OP_BEGIN_MESSAGE:
@@ -525,42 +505,100 @@ gpr_uint32 grpc_chttp2_encode_some(grpc_stream_op *ops, size_t *ops_count,
case GRPC_OP_SLICE:
slice = op->data.slice;
if (!GPR_SLICE_LENGTH(slice)) {
+ /* skip zero length slices */
+ gpr_slice_unref(slice);
curop++;
break;
}
- if (st.output_size == max_bytes) {
+ max_take_size = max_flow_controlled_bytes - flow_controlled_bytes_taken;
+ if (max_take_size == 0) {
goto exit_loop;
}
+ if (GPR_SLICE_LENGTH(slice) > max_take_size) {
+ slice = gpr_slice_split_head(&op->data.slice, max_take_size);
+ grpc_sopb_add_slice(outops, slice);
+ } else {
+ /* consume this op immediately */
+ grpc_sopb_append(outops, op, 1);
+ curop++;
+ }
+ flow_controlled_bytes_taken += GPR_SLICE_LENGTH(slice);
+ break;
+ }
+ }
+exit_loop:
+ *inops_count -= curop;
+ memmove(inops, inops + curop, *inops_count * sizeof(grpc_stream_op));
+
+ return flow_controlled_bytes_taken;
+}
+
+void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
+ gpr_uint32 stream_id,
+ grpc_chttp2_hpack_compressor *compressor,
+ gpr_slice_buffer *output) {
+ framer_state st;
+ gpr_slice slice;
+ grpc_stream_op *op;
+ gpr_uint32 max_take_size;
+ gpr_uint32 curop = 0;
+
+ GPR_ASSERT(stream_id != 0);
+
+ st.cur_frame_type = NONE;
+ st.last_was_header = 0;
+ st.stream_id = stream_id;
+ st.output = output;
+
+ while (curop < ops_count) {
+ op = &ops[curop];
+ switch (op->type) {
+ case GRPC_NO_OP:
+ case GRPC_OP_BEGIN_MESSAGE:
+ gpr_log(
+ GPR_ERROR,
+ "These stream ops should be filtered out by grpc_chttp2_preencode");
+ abort();
+ case GRPC_OP_FLOW_CTL_CB:
+ op->data.flow_ctl_cb.cb(op->data.flow_ctl_cb.arg, GRPC_OP_OK);
+ curop++;
+ break;
+ case GRPC_OP_METADATA:
+ hpack_enc(compressor, op->data.metadata, &st);
+ curop++;
+ break;
+ case GRPC_OP_DEADLINE:
+ deadline_enc(compressor, op->data.deadline, &st);
+ curop++;
+ break;
+ case GRPC_OP_METADATA_BOUNDARY:
+ ensure_frame_type(&st, HEADER, 0);
+ finish_frame(&st, 1, 0);
+ st.last_was_header = 0; /* force a new header frame */
+ curop++;
+ break;
+ case GRPC_OP_SLICE:
+ slice = op->data.slice;
if (st.cur_frame_type == DATA &&
st.output->length - st.output_length_at_start_of_frame ==
GRPC_CHTTP2_MAX_PAYLOAD_LENGTH) {
finish_frame(&st, 0, 0);
}
ensure_frame_type(&st, DATA, 1);
- max_take_size =
- GPR_MIN(max_bytes - st.output_size,
- GRPC_CHTTP2_MAX_PAYLOAD_LENGTH +
- st.output_length_at_start_of_frame - st.output->length);
+ max_take_size = GRPC_CHTTP2_MAX_PAYLOAD_LENGTH +
+ st.output_length_at_start_of_frame - st.output->length;
if (GPR_SLICE_LENGTH(slice) > max_take_size) {
slice = gpr_slice_split_head(&op->data.slice, max_take_size);
} else {
/* consume this op immediately */
curop++;
}
- st.output_size += GPR_SLICE_LENGTH(slice);
gpr_slice_buffer_add(output, slice);
break;
}
}
-exit_loop:
if (eof && st.cur_frame_type == NONE) {
begin_frame(&st, DATA);
}
- finish_frame(&st, 1, eof && curop == nops);
-
- nops -= curop;
- *ops_count = nops;
- memmove(ops, ops + curop, nops * sizeof(grpc_stream_op));
-
- return st.output_size;
+ finish_frame(&st, 1, eof);
}