aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/chttp2/stream_encoder.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/transport/chttp2/stream_encoder.c')
-rw-r--r--src/core/transport/chttp2/stream_encoder.c116
1 files changed, 72 insertions, 44 deletions
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index 79cce553fa..5ca31d6bc7 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -43,7 +43,7 @@
#include "src/core/transport/chttp2/timeout_encoding.h"
#include "src/core/transport/chttp2/varint.h"
-#define HASH_FRAGMENT_1(x) ((x) & 255)
+#define HASH_FRAGMENT_1(x) ((x)&255)
#define HASH_FRAGMENT_2(x) ((x >> 8) & 255)
#define HASH_FRAGMENT_3(x) ((x >> 16) & 255)
#define HASH_FRAGMENT_4(x) ((x >> 24) & 255)
@@ -171,13 +171,15 @@ static gpr_uint8 *add_tiny_header_data(framer_state *st, int len) {
return gpr_slice_buffer_tiny_add(st->output, len);
}
-static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) {
+/* add an element to the decoder table: returns metadata element to unref */
+static grpc_mdelem *add_elem(grpc_chttp2_hpack_compressor *c,
+ grpc_mdelem *elem) {
gpr_uint32 key_hash = elem->key->hash;
gpr_uint32 elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash);
gpr_uint32 new_index = c->tail_remote_index + c->table_elems + 1;
gpr_uint32 elem_size = 32 + GPR_SLICE_LENGTH(elem->key->slice) +
GPR_SLICE_LENGTH(elem->value->slice);
- int drop_ref;
+ grpc_mdelem *elem_to_unref;
/* Reserve space for this element in the remote table: if this overflows
the current table, drop elements until it fits, matching the decompressor
@@ -204,34 +206,32 @@ static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) {
if (c->entries_elems[HASH_FRAGMENT_2(elem_hash)] == elem) {
/* already there: update with new index */
c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
- drop_ref = 1;
+ elem_to_unref = elem;
} else if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == elem) {
/* already there (cuckoo): update with new index */
c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
- drop_ref = 1;
+ elem_to_unref = elem;
} else if (c->entries_elems[HASH_FRAGMENT_2(elem_hash)] == NULL) {
/* not there, but a free element: add */
c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = elem;
c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
- drop_ref = 0;
+ elem_to_unref = NULL;
} else if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == NULL) {
/* not there (cuckoo), but a free element: add */
c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = elem;
c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
- drop_ref = 0;
+ elem_to_unref = NULL;
} else if (c->indices_elems[HASH_FRAGMENT_2(elem_hash)] <
c->indices_elems[HASH_FRAGMENT_3(elem_hash)]) {
/* not there: replace oldest */
- grpc_mdelem_unref(c->entries_elems[HASH_FRAGMENT_2(elem_hash)]);
+ elem_to_unref = c->entries_elems[HASH_FRAGMENT_2(elem_hash)];
c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = elem;
c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
- drop_ref = 0;
} else {
/* not there: replace oldest */
- grpc_mdelem_unref(c->entries_elems[HASH_FRAGMENT_3(elem_hash)]);
+ elem_to_unref = c->entries_elems[HASH_FRAGMENT_3(elem_hash)];
c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = elem;
c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
- drop_ref = 0;
}
/* do exactly the same for the key (so we can find by that again too) */
@@ -257,9 +257,7 @@ static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) {
c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index;
}
- if (drop_ref) {
- grpc_mdelem_unref(elem);
- }
+ return elem_to_unref;
}
static void emit_indexed(grpc_chttp2_hpack_compressor *c, gpr_uint32 index,
@@ -348,9 +346,9 @@ static gpr_uint32 dynidx(grpc_chttp2_hpack_compressor *c, gpr_uint32 index) {
c->table_elems - index;
}
-/* encode an mdelem, taking ownership of it */
-static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
- framer_state *st) {
+/* encode an mdelem; returns metadata element to unref */
+static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c,
+ grpc_mdelem *elem, framer_state *st) {
gpr_uint32 key_hash = elem->key->hash;
gpr_uint32 elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash);
size_t decoder_space_usage;
@@ -366,8 +364,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
/* HIT: complete element (first cuckoo hash) */
emit_indexed(c, dynidx(c, c->indices_elems[HASH_FRAGMENT_2(elem_hash)]),
st);
- grpc_mdelem_unref(elem);
- return;
+ return elem;
}
if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == elem &&
@@ -375,8 +372,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
/* HIT: complete element (second cuckoo hash) */
emit_indexed(c, dynidx(c, c->indices_elems[HASH_FRAGMENT_3(elem_hash)]),
st);
- grpc_mdelem_unref(elem);
- return;
+ return elem;
}
/* should this elem be in the table? */
@@ -394,12 +390,12 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
/* HIT: key (first cuckoo hash) */
if (should_add_elem) {
emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st);
- add_elem(c, elem);
+ return add_elem(c, elem);
} else {
emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st);
- grpc_mdelem_unref(elem);
+ return elem;
}
- return;
+ abort();
}
indices_key = c->indices_keys[HASH_FRAGMENT_3(key_hash)];
@@ -408,23 +404,24 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
/* HIT: key (first cuckoo hash) */
if (should_add_elem) {
emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st);
- add_elem(c, elem);
+ return add_elem(c, elem);
} else {
emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st);
- grpc_mdelem_unref(elem);
+ return elem;
}
- return;
+ abort();
}
/* no elem, key in the table... fall back to literal emission */
if (should_add_elem) {
emit_lithdr_incidx_v(c, elem, st);
- add_elem(c, elem);
+ return add_elem(c, elem);
} else {
emit_lithdr_noidx_v(c, elem, st);
- grpc_mdelem_unref(elem);
+ return elem;
}
+ abort();
}
#define STRLEN_LIT(x) (sizeof(x) - 1)
@@ -433,11 +430,13 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline,
framer_state *st) {
char timeout_str[GRPC_CHTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE];
+ grpc_mdelem *mdelem;
grpc_chttp2_encode_timeout(gpr_time_sub(deadline, gpr_now()), timeout_str);
- hpack_enc(c, grpc_mdelem_from_metadata_strings(
- c->mdctx, grpc_mdstr_ref(c->timeout_key_str),
- grpc_mdstr_from_string(c->mdctx, timeout_str)),
- st);
+ mdelem = grpc_mdelem_from_metadata_strings(
+ c->mdctx, grpc_mdstr_ref(c->timeout_key_str),
+ grpc_mdstr_from_string(c->mdctx, timeout_str));
+ mdelem = hpack_enc(c, mdelem, st);
+ if (mdelem) grpc_mdelem_unref(mdelem);
}
gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id) {
@@ -480,10 +479,9 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
/* skip */
curop++;
break;
- case GRPC_OP_FLOW_CTL_CB:
- case GRPC_OP_DEADLINE:
case GRPC_OP_METADATA:
- case GRPC_OP_METADATA_BOUNDARY:
+ grpc_metadata_batch_assert_ok(&op->data.metadata);
+ case GRPC_OP_FLOW_CTL_CB:
/* these just get copied as they don't impact the number of flow
controlled bytes */
grpc_sopb_append(outops, op, 1);
@@ -530,6 +528,12 @@ exit_loop:
*inops_count -= curop;
memmove(inops, inops + curop, *inops_count * sizeof(grpc_stream_op));
+ for (curop = 0; curop < *inops_count; curop++) {
+ if (inops[curop].type == GRPC_OP_METADATA) {
+ grpc_metadata_batch_assert_ok(&inops[curop].data.metadata);
+ }
+ }
+
return flow_controlled_bytes_taken;
}
@@ -542,6 +546,10 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
grpc_stream_op *op;
gpr_uint32 max_take_size;
gpr_uint32 curop = 0;
+ gpr_uint32 unref_op;
+ grpc_mdctx *mdctx = compressor->mdctx;
+ grpc_linked_mdelem *l;
+ int need_unref = 0;
GPR_ASSERT(stream_id != 0);
@@ -564,14 +572,19 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
curop++;
break;
case GRPC_OP_METADATA:
- hpack_enc(compressor, op->data.metadata, &st);
- curop++;
- break;
- case GRPC_OP_DEADLINE:
- deadline_enc(compressor, op->data.deadline, &st);
- curop++;
- break;
- case GRPC_OP_METADATA_BOUNDARY:
+ /* Encode a metadata batch; store the returned values, representing
+ a metadata element that needs to be unreffed back into the metadata
+ slot. THIS MAY NOT BE THE SAME ELEMENT (if a decoder table slot got
+ updated). After this loop, we'll do a batch unref of elements. */
+ need_unref |= op->data.metadata.garbage.head != NULL;
+ grpc_metadata_batch_assert_ok(&op->data.metadata);
+ for (l = op->data.metadata.list.head; l; l = l->next) {
+ l->md = hpack_enc(compressor, l->md, &st);
+ need_unref |= l->md != NULL;
+ }
+ if (gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future) != 0) {
+ deadline_enc(compressor, op->data.metadata.deadline, &st);
+ }
ensure_frame_type(&st, HEADER, 0);
finish_frame(&st, 1, 0);
st.last_was_header = 0; /* force a new header frame */
@@ -601,4 +614,19 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
begin_frame(&st, DATA);
}
finish_frame(&st, 1, eof);
+
+ if (need_unref) {
+ grpc_mdctx_lock(mdctx);
+ for (unref_op = 0; unref_op < curop; unref_op++) {
+ op = &ops[unref_op];
+ if (op->type != GRPC_OP_METADATA) continue;
+ for (l = op->data.metadata.list.head; l; l = l->next) {
+ if (l->md) grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
+ }
+ for (l = op->data.metadata.garbage.head; l; l = l->next) {
+ grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
+ }
+ }
+ grpc_mdctx_unlock(mdctx);
+ }
}