diff options
Diffstat (limited to 'src/core/transport/chttp2/stream_encoder.c')
-rw-r--r-- | src/core/transport/chttp2/stream_encoder.c | 116 |
1 files changed, 72 insertions, 44 deletions
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index 79cce553fa..5ca31d6bc7 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -43,7 +43,7 @@ #include "src/core/transport/chttp2/timeout_encoding.h" #include "src/core/transport/chttp2/varint.h" -#define HASH_FRAGMENT_1(x) ((x) & 255) +#define HASH_FRAGMENT_1(x) ((x)&255) #define HASH_FRAGMENT_2(x) ((x >> 8) & 255) #define HASH_FRAGMENT_3(x) ((x >> 16) & 255) #define HASH_FRAGMENT_4(x) ((x >> 24) & 255) @@ -171,13 +171,15 @@ static gpr_uint8 *add_tiny_header_data(framer_state *st, int len) { return gpr_slice_buffer_tiny_add(st->output, len); } -static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) { +/* add an element to the decoder table: returns metadata element to unref */ +static grpc_mdelem *add_elem(grpc_chttp2_hpack_compressor *c, + grpc_mdelem *elem) { gpr_uint32 key_hash = elem->key->hash; gpr_uint32 elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash); gpr_uint32 new_index = c->tail_remote_index + c->table_elems + 1; gpr_uint32 elem_size = 32 + GPR_SLICE_LENGTH(elem->key->slice) + GPR_SLICE_LENGTH(elem->value->slice); - int drop_ref; + grpc_mdelem *elem_to_unref; /* Reserve space for this element in the remote table: if this overflows the current table, drop elements until it fits, matching the decompressor @@ -204,34 +206,32 @@ static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) { if (c->entries_elems[HASH_FRAGMENT_2(elem_hash)] == elem) { /* already there: update with new index */ c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index; - drop_ref = 1; + elem_to_unref = elem; } else if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == elem) { /* already there (cuckoo): update with new index */ c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index; - drop_ref = 1; + elem_to_unref = elem; } else if (c->entries_elems[HASH_FRAGMENT_2(elem_hash)] == NULL) { /* not there, but a free element: add */ c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = elem; c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index; - drop_ref = 0; + elem_to_unref = NULL; } else if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == NULL) { /* not there (cuckoo), but a free element: add */ c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = elem; c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index; - drop_ref = 0; + elem_to_unref = NULL; } else if (c->indices_elems[HASH_FRAGMENT_2(elem_hash)] < c->indices_elems[HASH_FRAGMENT_3(elem_hash)]) { /* not there: replace oldest */ - grpc_mdelem_unref(c->entries_elems[HASH_FRAGMENT_2(elem_hash)]); + elem_to_unref = c->entries_elems[HASH_FRAGMENT_2(elem_hash)]; c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = elem; c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index; - drop_ref = 0; } else { /* not there: replace oldest */ - grpc_mdelem_unref(c->entries_elems[HASH_FRAGMENT_3(elem_hash)]); + elem_to_unref = c->entries_elems[HASH_FRAGMENT_3(elem_hash)]; c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = elem; c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index; - drop_ref = 0; } /* do exactly the same for the key (so we can find by that again too) */ @@ -257,9 +257,7 @@ static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) { c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index; } - if (drop_ref) { - grpc_mdelem_unref(elem); - } + return elem_to_unref; } static void emit_indexed(grpc_chttp2_hpack_compressor *c, gpr_uint32 index, @@ -348,9 +346,9 @@ static gpr_uint32 dynidx(grpc_chttp2_hpack_compressor *c, gpr_uint32 index) { c->table_elems - index; } -/* encode an mdelem, taking ownership of it */ -static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, - framer_state *st) { +/* encode an mdelem; returns metadata element to unref */ +static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c, + grpc_mdelem *elem, framer_state *st) { gpr_uint32 key_hash = elem->key->hash; gpr_uint32 elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash); size_t decoder_space_usage; @@ -366,8 +364,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, /* HIT: complete element (first cuckoo hash) */ emit_indexed(c, dynidx(c, c->indices_elems[HASH_FRAGMENT_2(elem_hash)]), st); - grpc_mdelem_unref(elem); - return; + return elem; } if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == elem && @@ -375,8 +372,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, /* HIT: complete element (second cuckoo hash) */ emit_indexed(c, dynidx(c, c->indices_elems[HASH_FRAGMENT_3(elem_hash)]), st); - grpc_mdelem_unref(elem); - return; + return elem; } /* should this elem be in the table? */ @@ -394,12 +390,12 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, /* HIT: key (first cuckoo hash) */ if (should_add_elem) { emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st); - add_elem(c, elem); + return add_elem(c, elem); } else { emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st); - grpc_mdelem_unref(elem); + return elem; } - return; + abort(); } indices_key = c->indices_keys[HASH_FRAGMENT_3(key_hash)]; @@ -408,23 +404,24 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, /* HIT: key (first cuckoo hash) */ if (should_add_elem) { emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st); - add_elem(c, elem); + return add_elem(c, elem); } else { emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st); - grpc_mdelem_unref(elem); + return elem; } - return; + abort(); } /* no elem, key in the table... fall back to literal emission */ if (should_add_elem) { emit_lithdr_incidx_v(c, elem, st); - add_elem(c, elem); + return add_elem(c, elem); } else { emit_lithdr_noidx_v(c, elem, st); - grpc_mdelem_unref(elem); + return elem; } + abort(); } #define STRLEN_LIT(x) (sizeof(x) - 1) @@ -433,11 +430,13 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline, framer_state *st) { char timeout_str[GRPC_CHTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE]; + grpc_mdelem *mdelem; grpc_chttp2_encode_timeout(gpr_time_sub(deadline, gpr_now()), timeout_str); - hpack_enc(c, grpc_mdelem_from_metadata_strings( - c->mdctx, grpc_mdstr_ref(c->timeout_key_str), - grpc_mdstr_from_string(c->mdctx, timeout_str)), - st); + mdelem = grpc_mdelem_from_metadata_strings( + c->mdctx, grpc_mdstr_ref(c->timeout_key_str), + grpc_mdstr_from_string(c->mdctx, timeout_str)); + mdelem = hpack_enc(c, mdelem, st); + if (mdelem) grpc_mdelem_unref(mdelem); } gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id) { @@ -480,10 +479,9 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, /* skip */ curop++; break; - case GRPC_OP_FLOW_CTL_CB: - case GRPC_OP_DEADLINE: case GRPC_OP_METADATA: - case GRPC_OP_METADATA_BOUNDARY: + grpc_metadata_batch_assert_ok(&op->data.metadata); + case GRPC_OP_FLOW_CTL_CB: /* these just get copied as they don't impact the number of flow controlled bytes */ grpc_sopb_append(outops, op, 1); @@ -530,6 +528,12 @@ exit_loop: *inops_count -= curop; memmove(inops, inops + curop, *inops_count * sizeof(grpc_stream_op)); + for (curop = 0; curop < *inops_count; curop++) { + if (inops[curop].type == GRPC_OP_METADATA) { + grpc_metadata_batch_assert_ok(&inops[curop].data.metadata); + } + } + return flow_controlled_bytes_taken; } @@ -542,6 +546,10 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, grpc_stream_op *op; gpr_uint32 max_take_size; gpr_uint32 curop = 0; + gpr_uint32 unref_op; + grpc_mdctx *mdctx = compressor->mdctx; + grpc_linked_mdelem *l; + int need_unref = 0; GPR_ASSERT(stream_id != 0); @@ -564,14 +572,19 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, curop++; break; case GRPC_OP_METADATA: - hpack_enc(compressor, op->data.metadata, &st); - curop++; - break; - case GRPC_OP_DEADLINE: - deadline_enc(compressor, op->data.deadline, &st); - curop++; - break; - case GRPC_OP_METADATA_BOUNDARY: + /* Encode a metadata batch; store the returned values, representing + a metadata element that needs to be unreffed back into the metadata + slot. THIS MAY NOT BE THE SAME ELEMENT (if a decoder table slot got + updated). After this loop, we'll do a batch unref of elements. */ + need_unref |= op->data.metadata.garbage.head != NULL; + grpc_metadata_batch_assert_ok(&op->data.metadata); + for (l = op->data.metadata.list.head; l; l = l->next) { + l->md = hpack_enc(compressor, l->md, &st); + need_unref |= l->md != NULL; + } + if (gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future) != 0) { + deadline_enc(compressor, op->data.metadata.deadline, &st); + } ensure_frame_type(&st, HEADER, 0); finish_frame(&st, 1, 0); st.last_was_header = 0; /* force a new header frame */ @@ -601,4 +614,19 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, begin_frame(&st, DATA); } finish_frame(&st, 1, eof); + + if (need_unref) { + grpc_mdctx_lock(mdctx); + for (unref_op = 0; unref_op < curop; unref_op++) { + op = &ops[unref_op]; + if (op->type != GRPC_OP_METADATA) continue; + for (l = op->data.metadata.list.head; l; l = l->next) { + if (l->md) grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + } + for (l = op->data.metadata.garbage.head; l; l = l->next) { + grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + } + } + grpc_mdctx_unlock(mdctx); + } } |