diff options
Diffstat (limited to 'src/core/transport/chttp2/stream_encoder.c')
-rw-r--r-- | src/core/transport/chttp2/stream_encoder.c | 553 |
1 files changed, 553 insertions, 0 deletions
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c new file mode 100644 index 0000000000..d46366d8b2 --- /dev/null +++ b/src/core/transport/chttp2/stream_encoder.c @@ -0,0 +1,553 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/transport/chttp2/stream_encoder.h" + +#include <assert.h> +#include <string.h> + +#include <grpc/support/log.h> +#include <grpc/support/useful.h> +#include "src/core/transport/chttp2/hpack_table.h" +#include "src/core/transport/chttp2/timeout_encoding.h" +#include "src/core/transport/chttp2/varint.h" + +#define HASH_FRAGMENT_1(x) ((x)&255) +#define HASH_FRAGMENT_2(x) ((x >> 8) & 255) +#define HASH_FRAGMENT_3(x) ((x >> 16) & 255) +#define HASH_FRAGMENT_4(x) ((x >> 24) & 255) + +/* if the probability of this item being seen again is < 1/x then don't add + it to the table */ +#define ONE_ON_ADD_PROBABILITY 128 +/* don't consider adding anything bigger than this to the hpack table */ +#define MAX_DECODER_SPACE_USAGE 512 + +/* what kind of frame our we encoding? */ +typedef enum { HEADER, DATA, NONE } frame_type; + +typedef struct { + frame_type cur_frame_type; + /* number of bytes in 'output' when we started the frame - used to calculate + frame length */ + size_t output_length_at_start_of_frame; + /* index (in output) of the header for the current frame */ + size_t header_idx; + /* was the last frame emitted a header? (if yes, we'll need a CONTINUATION */ + gpr_uint8 last_was_header; + /* output stream id */ + gpr_uint32 stream_id; + /* number of flow controlled bytes written */ + gpr_uint32 output_size; + gpr_slice_buffer *output; +} framer_state; + +/* fills p (which is expected to be 9 bytes long) with a data frame header */ +static void fill_header(gpr_uint8 *p, gpr_uint8 type, gpr_uint32 id, + gpr_uint32 len, gpr_uint8 flags) { + *p++ = len >> 16; + *p++ = len >> 8; + *p++ = len; + *p++ = type; + *p++ = flags; + *p++ = id >> 24; + *p++ = id >> 16; + *p++ = id >> 8; + *p++ = id; +} + +/* finish a frame - fill in the previously reserved header */ +static void finish_frame(framer_state *st, int is_header_boundary, + int is_last_in_stream) { + gpr_uint8 type = 0xff; + switch (st->cur_frame_type) { + case HEADER: + type = st->last_was_header ? GRPC_CHTTP2_FRAME_CONTINUATION + : GRPC_CHTTP2_FRAME_HEADER; + st->last_was_header = 1; + break; + case DATA: + type = GRPC_CHTTP2_FRAME_DATA; + st->last_was_header = 0; + is_header_boundary = 0; + break; + case NONE: + return; + } + fill_header(GPR_SLICE_START_PTR(st->output->slices[st->header_idx]), type, + st->stream_id, + st->output->length - st->output_length_at_start_of_frame, + (is_last_in_stream ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0) | + (is_header_boundary ? GRPC_CHTTP2_DATA_FLAG_END_HEADERS : 0)); + st->cur_frame_type = NONE; +} + +/* begin a new frame: reserve off header space, remember how many bytes we'd + output before beginning */ +static void begin_frame(framer_state *st, frame_type type) { + GPR_ASSERT(type != NONE); + GPR_ASSERT(st->cur_frame_type == NONE); + st->cur_frame_type = type; + st->header_idx = + gpr_slice_buffer_add_indexed(st->output, gpr_slice_malloc(9)); + st->output_length_at_start_of_frame = st->output->length; +} + +/* make sure that the current frame is of the type desired, and has sufficient + space to add at least about_to_add bytes -- finishes the current frame if + needed */ +static void ensure_frame_type(framer_state *st, frame_type type, + int need_bytes) { + if (st->cur_frame_type == type && + st->output->length - st->output_length_at_start_of_frame + need_bytes <= + GRPC_CHTTP2_MAX_PAYLOAD_LENGTH) { + return; + } + finish_frame(st, type != HEADER, 0); + begin_frame(st, type); +} + +/* increment a filter count, halve all counts if one element reaches max */ +static void inc_filter(gpr_uint8 idx, gpr_uint32 *sum, gpr_uint8 *elems) { + elems[idx]++; + if (elems[idx] < 255) { + (*sum)++; + } else { + int i; + *sum = 0; + for (i = 0; i < GRPC_CHTTP2_HPACKC_NUM_FILTERS; i++) { + elems[i] /= 2; + (*sum) += elems[i]; + } + } +} + +static void add_header_data(framer_state *st, gpr_slice slice) { + size_t len = GPR_SLICE_LENGTH(slice); + size_t remaining; + if (len == 0) return; + ensure_frame_type(st, HEADER, 1); + remaining = GRPC_CHTTP2_MAX_PAYLOAD_LENGTH + + st->output_length_at_start_of_frame - st->output->length; + if (len <= remaining) { + gpr_slice_buffer_add(st->output, slice); + } else { + gpr_slice_buffer_add(st->output, gpr_slice_split_head(&slice, remaining)); + add_header_data(st, slice); + } +} + +static gpr_uint8 *add_tiny_header_data(framer_state *st, int len) { + ensure_frame_type(st, HEADER, len); + return gpr_slice_buffer_tiny_add(st->output, len); +} + +static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) { + gpr_uint32 key_hash = elem->key->hash; + gpr_uint32 elem_hash = key_hash ^ elem->value->hash; + gpr_uint32 new_index = c->tail_remote_index + c->table_elems + 1; + gpr_uint32 elem_size = 32 + GPR_SLICE_LENGTH(elem->key->slice) + + GPR_SLICE_LENGTH(elem->value->slice); + int drop_ref; + + /* Reserve space for this element in the remote table: if this overflows + the current table, drop elements until it fits, matching the decompressor + algorithm */ + /* TODO(ctiller): constant */ + while (c->table_size + elem_size > 4096) { + c->tail_remote_index++; + GPR_ASSERT(c->tail_remote_index > 0); + GPR_ASSERT(c->table_size >= + c->table_elem_size[c->tail_remote_index % + GRPC_CHTTP2_HPACKC_MAX_TABLE_ELEMS]); + GPR_ASSERT(c->table_elems > 0); + c->table_size -= c->table_elem_size[c->tail_remote_index % + GRPC_CHTTP2_HPACKC_MAX_TABLE_ELEMS]; + c->table_elems--; + } + GPR_ASSERT(c->table_elems < GRPC_CHTTP2_HPACKC_MAX_TABLE_ELEMS); + c->table_elem_size[new_index % GRPC_CHTTP2_HPACKC_MAX_TABLE_ELEMS] = + elem_size; + c->table_size += elem_size; + c->table_elems++; + + /* Store this element into {entries,indices}_elem */ + if (c->entries_elems[HASH_FRAGMENT_2(elem_hash)] == elem) { + /* already there: update with new index */ + c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index; + drop_ref = 1; + } else if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == elem) { + /* already there (cuckoo): update with new index */ + c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index; + drop_ref = 1; + } else if (c->entries_elems[HASH_FRAGMENT_2(elem_hash)] == NULL) { + /* not there, but a free element: add */ + c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = elem; + c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index; + drop_ref = 0; + } else if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == NULL) { + /* not there (cuckoo), but a free element: add */ + c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = elem; + c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index; + drop_ref = 0; + } else if (c->indices_elems[HASH_FRAGMENT_2(elem_hash)] < + c->indices_elems[HASH_FRAGMENT_3(elem_hash)]) { + /* not there: replace oldest */ + grpc_mdelem_unref(c->entries_elems[HASH_FRAGMENT_2(elem_hash)]); + c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = elem; + c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index; + drop_ref = 0; + } else { + /* not there: replace oldest */ + grpc_mdelem_unref(c->entries_elems[HASH_FRAGMENT_3(elem_hash)]); + c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = elem; + c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index; + drop_ref = 0; + } + + /* do exactly the same for the key (so we can find by that again too) */ + + if (c->entries_keys[HASH_FRAGMENT_2(key_hash)] == elem->key) { + c->indices_keys[HASH_FRAGMENT_2(key_hash)] = new_index; + } else if (c->entries_keys[HASH_FRAGMENT_3(key_hash)] == elem->key) { + c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index; + } else if (c->entries_keys[HASH_FRAGMENT_2(key_hash)] == NULL) { + c->entries_keys[HASH_FRAGMENT_2(key_hash)] = grpc_mdstr_ref(elem->key); + c->indices_keys[HASH_FRAGMENT_2(key_hash)] = new_index; + } else if (c->entries_keys[HASH_FRAGMENT_3(key_hash)] == NULL) { + c->entries_keys[HASH_FRAGMENT_3(key_hash)] = grpc_mdstr_ref(elem->key); + c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index; + } else if (c->indices_keys[HASH_FRAGMENT_2(key_hash)] < + c->indices_keys[HASH_FRAGMENT_3(key_hash)]) { + grpc_mdstr_unref(c->entries_keys[HASH_FRAGMENT_2(key_hash)]); + c->entries_keys[HASH_FRAGMENT_2(key_hash)] = grpc_mdstr_ref(elem->key); + c->indices_keys[HASH_FRAGMENT_2(key_hash)] = new_index; + } else { + grpc_mdstr_unref(c->entries_keys[HASH_FRAGMENT_3(key_hash)]); + c->entries_keys[HASH_FRAGMENT_3(key_hash)] = grpc_mdstr_ref(elem->key); + c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index; + } + + if (drop_ref) { + grpc_mdelem_unref(elem); + } +} + +static void emit_indexed(grpc_chttp2_hpack_compressor *c, gpr_uint32 index, + framer_state *st) { + int len = GRPC_CHTTP2_VARINT_LENGTH(index, 1); + GRPC_CHTTP2_WRITE_VARINT(index, 1, 0x80, add_tiny_header_data(st, len), len); +} + +static void emit_lithdr_incidx(grpc_chttp2_hpack_compressor *c, + gpr_uint32 key_index, grpc_mdstr *value, + framer_state *st) { + int len_pfx = GRPC_CHTTP2_VARINT_LENGTH(key_index, 2); + int len_val = GPR_SLICE_LENGTH(value->slice); + int len_val_len = GRPC_CHTTP2_VARINT_LENGTH(len_val, 1); + GRPC_CHTTP2_WRITE_VARINT(key_index, 2, 0x40, + add_tiny_header_data(st, len_pfx), len_pfx); + GRPC_CHTTP2_WRITE_VARINT(len_val, 1, 0x00, + add_tiny_header_data(st, len_val_len), len_val_len); + add_header_data(st, gpr_slice_ref(value->slice)); +} + +static void emit_lithdr_noidx(grpc_chttp2_hpack_compressor *c, + gpr_uint32 key_index, grpc_mdstr *value, + framer_state *st) { + int len_pfx = GRPC_CHTTP2_VARINT_LENGTH(key_index, 4); + int len_val = GPR_SLICE_LENGTH(value->slice); + int len_val_len = GRPC_CHTTP2_VARINT_LENGTH(len_val, 1); + GRPC_CHTTP2_WRITE_VARINT(key_index, 4, 0x00, + add_tiny_header_data(st, len_pfx), len_pfx); + GRPC_CHTTP2_WRITE_VARINT(len_val, 1, 0x00, + add_tiny_header_data(st, len_val_len), len_val_len); + add_header_data(st, gpr_slice_ref(value->slice)); +} + +static void emit_lithdr_incidx_v(grpc_chttp2_hpack_compressor *c, + grpc_mdstr *key, grpc_mdstr *value, + framer_state *st) { + int len_key = GPR_SLICE_LENGTH(key->slice); + int len_val = GPR_SLICE_LENGTH(value->slice); + int len_key_len = GRPC_CHTTP2_VARINT_LENGTH(len_key, 1); + int len_val_len = GRPC_CHTTP2_VARINT_LENGTH(len_val, 1); + *add_tiny_header_data(st, 1) = 0x40; + GRPC_CHTTP2_WRITE_VARINT(len_key, 1, 0x00, + add_tiny_header_data(st, len_key_len), len_key_len); + add_header_data(st, gpr_slice_ref(key->slice)); + GRPC_CHTTP2_WRITE_VARINT(len_val, 1, 0x00, + add_tiny_header_data(st, len_val_len), len_val_len); + add_header_data(st, gpr_slice_ref(value->slice)); +} + +static void emit_lithdr_noidx_v(grpc_chttp2_hpack_compressor *c, + grpc_mdstr *key, grpc_mdstr *value, + framer_state *st) { + int len_key = GPR_SLICE_LENGTH(key->slice); + int len_val = GPR_SLICE_LENGTH(value->slice); + int len_key_len = GRPC_CHTTP2_VARINT_LENGTH(len_key, 1); + int len_val_len = GRPC_CHTTP2_VARINT_LENGTH(len_val, 1); + *add_tiny_header_data(st, 1) = 0x00; + GRPC_CHTTP2_WRITE_VARINT(len_key, 1, 0x00, + add_tiny_header_data(st, len_key_len), len_key_len); + add_header_data(st, gpr_slice_ref(key->slice)); + GRPC_CHTTP2_WRITE_VARINT(len_val, 1, 0x00, + add_tiny_header_data(st, len_val_len), len_val_len); + add_header_data(st, gpr_slice_ref(value->slice)); +} + +static gpr_uint32 dynidx(grpc_chttp2_hpack_compressor *c, gpr_uint32 index) { + return 1 + GRPC_CHTTP2_LAST_STATIC_ENTRY + c->tail_remote_index + + c->table_elems - index; +} + +/* encode an mdelem, taking ownership of it */ +static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, + framer_state *st) { + gpr_uint32 key_hash = elem->key->hash; + gpr_uint32 elem_hash = key_hash ^ elem->value->hash; + size_t decoder_space_usage; + int should_add_elem; + + inc_filter(HASH_FRAGMENT_1(elem_hash), &c->filter_elems_sum, c->filter_elems); + + /* is this elem currently in the decoders table? */ + + if (c->entries_elems[HASH_FRAGMENT_2(elem_hash)] == elem && + c->indices_elems[HASH_FRAGMENT_2(elem_hash)] > c->tail_remote_index) { + /* HIT: complete element (first cuckoo hash) */ + emit_indexed(c, dynidx(c, c->indices_elems[HASH_FRAGMENT_2(elem_hash)]), + st); + grpc_mdelem_unref(elem); + return; + } + + if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == elem && + c->indices_elems[HASH_FRAGMENT_3(elem_hash)] > c->tail_remote_index) { + /* HIT: complete element (second cuckoo hash) */ + emit_indexed(c, dynidx(c, c->indices_elems[HASH_FRAGMENT_3(elem_hash)]), + st); + grpc_mdelem_unref(elem); + return; + } + + /* should this elem be in the table? */ + decoder_space_usage = 32 + GPR_SLICE_LENGTH(elem->key->slice) + + GPR_SLICE_LENGTH(elem->value->slice); + should_add_elem = decoder_space_usage < MAX_DECODER_SPACE_USAGE && + c->filter_elems[HASH_FRAGMENT_1(elem_hash)] >= + c->filter_elems_sum / ONE_ON_ADD_PROBABILITY; + + /* no hits for the elem... maybe there's a key? */ + + if (c->entries_keys[HASH_FRAGMENT_2(key_hash)] == elem->key && + c->indices_keys[HASH_FRAGMENT_2(key_hash)] > c->tail_remote_index) { + /* HIT: key (first cuckoo hash) */ + if (should_add_elem) { + emit_lithdr_incidx(c, + dynidx(c, c->indices_keys[HASH_FRAGMENT_2(key_hash)]), + elem->value, st); + add_elem(c, elem); + } else { + emit_lithdr_noidx(c, + dynidx(c, c->indices_keys[HASH_FRAGMENT_2(key_hash)]), + elem->value, st); + grpc_mdelem_unref(elem); + } + return; + } + + if (c->entries_keys[HASH_FRAGMENT_3(key_hash)] == elem->key && + c->indices_keys[HASH_FRAGMENT_3(key_hash)] > c->tail_remote_index) { + /* HIT: key (first cuckoo hash) */ + if (should_add_elem) { + emit_lithdr_incidx(c, + dynidx(c, c->indices_keys[HASH_FRAGMENT_3(key_hash)]), + elem->value, st); + add_elem(c, elem); + } else { + emit_lithdr_noidx(c, + dynidx(c, c->indices_keys[HASH_FRAGMENT_3(key_hash)]), + elem->value, st); + grpc_mdelem_unref(elem); + } + return; + } + + /* no elem, key in the table... fall back to literal emission */ + + if (should_add_elem) { + emit_lithdr_incidx_v(c, elem->key, elem->value, st); + add_elem(c, elem); + } else { + emit_lithdr_noidx_v(c, elem->key, elem->value, st); + grpc_mdelem_unref(elem); + } +} + +#define STRLEN_LIT(x) (sizeof(x) - 1) +#define TIMEOUT_KEY "grpc-timeout" + +static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline, + framer_state *st) { + char timeout_str[32]; + grpc_chttp2_encode_timeout(gpr_time_sub(deadline, gpr_now()), timeout_str); + hpack_enc(c, grpc_mdelem_from_metadata_strings( + c->mdctx, grpc_mdstr_ref(c->timeout_key_str), + grpc_mdstr_from_string(c->mdctx, timeout_str)), + st); +} + +gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id) { + gpr_slice slice = gpr_slice_malloc(9); + fill_header(GPR_SLICE_START_PTR(slice), GRPC_CHTTP2_FRAME_DATA, id, 0, 1); + return slice; +} + +void grpc_chttp2_hpack_compressor_init(grpc_chttp2_hpack_compressor *c, + grpc_mdctx *ctx) { + memset(c, 0, sizeof(*c)); + c->mdctx = ctx; + c->timeout_key_str = grpc_mdstr_from_string(ctx, "grpc-timeout"); +} + +void grpc_chttp2_hpack_compressor_destroy(grpc_chttp2_hpack_compressor *c) { + int i; + for (i = 0; i < GRPC_CHTTP2_HPACKC_NUM_VALUES; i++) { + if (c->entries_keys[i]) grpc_mdstr_unref(c->entries_keys[i]); + if (c->entries_elems[i]) grpc_mdelem_unref(c->entries_elems[i]); + } + grpc_mdstr_unref(c->timeout_key_str); +} + +gpr_uint32 grpc_chttp2_encode_some(grpc_stream_op *ops, size_t *ops_count, + int eof, gpr_slice_buffer *output, + gpr_uint32 max_bytes, gpr_uint32 stream_id, + grpc_chttp2_hpack_compressor *compressor) { + framer_state st; + gpr_slice slice; + grpc_stream_op *op; + gpr_uint32 max_take_size; + gpr_uint32 curop = 0; + gpr_uint32 nops = *ops_count; + gpr_uint8 *p; + + GPR_ASSERT(stream_id != 0); + + st.cur_frame_type = NONE; + st.last_was_header = 0; + st.stream_id = stream_id; + st.output = output; + st.output_size = 0; + + while (curop < nops) { + GPR_ASSERT(st.output_size <= max_bytes); + op = &ops[curop]; + switch (op->type) { + case GRPC_NO_OP: + curop++; + break; + case GRPC_OP_FLOW_CTL_CB: + op->data.flow_ctl_cb.cb(op->data.flow_ctl_cb.arg, GRPC_OP_OK); + curop++; + break; + case GRPC_OP_METADATA: + hpack_enc(compressor, op->data.metadata, &st); + curop++; + break; + case GRPC_OP_DEADLINE: + deadline_enc(compressor, op->data.deadline, &st); + curop++; + break; + case GRPC_OP_METADATA_BOUNDARY: + ensure_frame_type(&st, HEADER, 0); + finish_frame(&st, 1, 0); + st.last_was_header = 0; /* force a new header frame */ + curop++; + break; + case GRPC_OP_BEGIN_MESSAGE: + /* begin op: for now we just convert the op to a slice and fall + through - this lets us reuse the slice framing code below */ + slice = gpr_slice_malloc(5); + p = GPR_SLICE_START_PTR(slice); + p[0] = 0; + p[1] = op->data.begin_message.length >> 24; + p[2] = op->data.begin_message.length >> 16; + p[3] = op->data.begin_message.length >> 8; + p[4] = op->data.begin_message.length; + op->type = GRPC_OP_SLICE; + op->data.slice = slice; + /* fallthrough */ + case GRPC_OP_SLICE: + slice = op->data.slice; + if (!GPR_SLICE_LENGTH(slice)) { + curop++; + break; + } + if (st.output_size == max_bytes) { + goto exit_loop; + } + if (st.cur_frame_type == DATA && + st.output->length - st.output_length_at_start_of_frame == + GRPC_CHTTP2_MAX_PAYLOAD_LENGTH) { + finish_frame(&st, 0, 0); + } + ensure_frame_type(&st, DATA, 1); + max_take_size = + GPR_MIN(max_bytes - st.output_size, + GRPC_CHTTP2_MAX_PAYLOAD_LENGTH + + st.output_length_at_start_of_frame - st.output->length); + if (GPR_SLICE_LENGTH(slice) > max_take_size) { + slice = gpr_slice_split_head(&op->data.slice, max_take_size); + } else { + /* consume this op immediately */ + curop++; + } + st.output_size += GPR_SLICE_LENGTH(slice); + gpr_slice_buffer_add(output, slice); + break; + } + } +exit_loop: + if (eof && st.cur_frame_type == NONE) { + begin_frame(&st, DATA); + } + finish_frame(&st, 1, eof && curop == nops); + + nops -= curop; + *ops_count = nops; + memmove(ops, ops + curop, nops * sizeof(grpc_stream_op)); + + return st.output_size; +} |