aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/chttp2
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-11-02 14:16:12 -0800
committerGravatar Craig Tiller <ctiller@google.com>2015-11-02 14:16:12 -0800
commit9d35a1f9ce98e446d9036d64227b3c9e9317177a (patch)
tree77f2d34117faffcf5649fbd6bd66a850eea4e4d3 /src/core/transport/chttp2
parente8b5f627ccae2db13e366059c3738e96ac9a5d29 (diff)
stream_op cleanup: transport changes
Diffstat (limited to 'src/core/transport/chttp2')
-rw-r--r--src/core/transport/chttp2/frame_data.c90
-rw-r--r--src/core/transport/chttp2/frame_data.h23
-rw-r--r--src/core/transport/chttp2/frame_window_update.c15
-rw-r--r--src/core/transport/chttp2/hpack_encoder.c (renamed from src/core/transport/chttp2/stream_encoder.c)289
-rw-r--r--src/core/transport/chttp2/hpack_encoder.h (renamed from src/core/transport/chttp2/stream_encoder.h)22
-rw-r--r--src/core/transport/chttp2/hpack_parser.c16
-rw-r--r--src/core/transport/chttp2/incoming_metadata.c136
-rw-r--r--src/core/transport/chttp2/incoming_metadata.h24
-rw-r--r--src/core/transport/chttp2/internal.h346
-rw-r--r--src/core/transport/chttp2/parsing.c368
-rw-r--r--src/core/transport/chttp2/stream_lists.c112
-rw-r--r--src/core/transport/chttp2/writing.c330
12 files changed, 933 insertions, 838 deletions
diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c
index 07179a4571..7287f97aaa 100644
--- a/src/core/transport/chttp2/frame_data.c
+++ b/src/core/transport/chttp2/frame_data.c
@@ -45,12 +45,15 @@
grpc_chttp2_parse_error grpc_chttp2_data_parser_init(
grpc_chttp2_data_parser *parser) {
parser->state = GRPC_CHTTP2_DATA_FH_0;
- grpc_sopb_init(&parser->incoming_sopb);
return GRPC_CHTTP2_PARSE_OK;
}
void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser *parser) {
- grpc_sopb_destroy(&parser->incoming_sopb);
+ grpc_byte_stream *bs;
+ while (
+ (bs = grpc_chttp2_incoming_frame_queue_pop(&parser->incoming_frames))) {
+ grpc_byte_stream_destroy(bs);
+ }
}
grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
@@ -69,6 +72,62 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
return GRPC_CHTTP2_PARSE_OK;
}
+void grpc_chttp2_incoming_frame_queue_merge(
+ grpc_chttp2_incoming_frame_queue *head_dst,
+ grpc_chttp2_incoming_frame_queue *tail_src) {
+ if (tail_src->head == NULL) {
+ return;
+ }
+
+ if (head_dst->head == NULL) {
+ *head_dst = *tail_src;
+ memset(tail_src, 0, sizeof(*tail_src));
+ return;
+ }
+
+ head_dst->tail->next_message = tail_src->head;
+ head_dst->tail = tail_src->tail;
+ memset(tail_src, 0, sizeof(*tail_src));
+}
+
+grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
+ grpc_chttp2_incoming_frame_queue *q) {
+ grpc_byte_stream *out;
+ if (q->head == NULL) {
+ return NULL;
+ }
+ out = &q->head->base;
+ if (q->head == q->tail) {
+ memset(q, 0, sizeof(*q));
+ } else {
+ q->head = q->head->next_message;
+ }
+ return out;
+}
+
+void grpc_chttp2_encode_data(gpr_uint32 id, gpr_slice_buffer *inbuf,
+ gpr_uint32 write_bytes, int is_eof,
+ gpr_slice_buffer *outbuf) {
+ gpr_slice hdr;
+ gpr_uint8 *p;
+
+ hdr = gpr_slice_malloc(9);
+ p = GPR_SLICE_START_PTR(hdr);
+ GPR_ASSERT(write_bytes < 16777316);
+ *p++ = (gpr_uint8)(write_bytes >> 16);
+ *p++ = (gpr_uint8)(write_bytes >> 8);
+ *p++ = (gpr_uint8)(write_bytes);
+ *p++ = GRPC_CHTTP2_FRAME_DATA;
+ *p++ = is_eof ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0;
+ *p++ = (gpr_uint8)(id >> 24);
+ *p++ = (gpr_uint8)(id >> 16);
+ *p++ = (gpr_uint8)(id >> 8);
+ *p++ = (gpr_uint8)(id);
+ gpr_slice_buffer_add(outbuf, hdr);
+
+ gpr_slice_buffer_move_first(inbuf, write_bytes, outbuf);
+}
+
grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport_parsing *transport_parsing,
@@ -77,7 +136,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg;
grpc_chttp2_data_parser *p = parser;
- gpr_uint32 message_flags = 0;
+ gpr_uint32 message_flags;
+ grpc_chttp2_incoming_byte_stream *incoming_byte_stream;
if (is_last && p->is_last_frame) {
stream_parsing->received_close = 1;
@@ -132,11 +192,14 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
p->frame_size |= ((gpr_uint32)*cur);
p->state = GRPC_CHTTP2_DATA_FRAME;
++cur;
+ message_flags = 0;
if (p->is_frame_compressed) {
message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
- grpc_sopb_add_begin_message(&p->incoming_sopb, p->frame_size,
- message_flags);
+ p->parsing_frame = incoming_byte_stream =
+ grpc_chttp2_incoming_byte_stream_create(
+ transport_parsing, stream_parsing, p->frame_size, message_flags,
+ &p->incoming_frames);
/* fallthrough */
case GRPC_CHTTP2_DATA_FRAME:
if (cur == end) {
@@ -147,20 +210,23 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
stream_parsing);
if ((gpr_uint32)(end - cur) == p->frame_size) {
- grpc_sopb_add_slice(
- &p->incoming_sopb,
+ grpc_chttp2_incoming_byte_stream_push(
+ exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
+ grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame);
p->state = GRPC_CHTTP2_DATA_FH_0;
return GRPC_CHTTP2_PARSE_OK;
} else if ((gpr_uint32)(end - cur) > p->frame_size) {
- grpc_sopb_add_slice(&p->incoming_sopb,
- gpr_slice_sub(slice, (size_t)(cur - beg),
- (size_t)(cur + p->frame_size - beg)));
+ grpc_chttp2_incoming_byte_stream_push(
+ exec_ctx, p->parsing_frame,
+ gpr_slice_sub(slice, (size_t)(cur - beg),
+ (size_t)(cur + p->frame_size - beg)));
+ grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame);
cur += p->frame_size;
goto fh_0; /* loop */
} else {
- grpc_sopb_add_slice(
- &p->incoming_sopb,
+ grpc_chttp2_incoming_byte_stream_push(
+ exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
GPR_ASSERT((size_t)(end - cur) <= p->frame_size);
p->frame_size -= (gpr_uint32)(end - cur);
diff --git a/src/core/transport/chttp2/frame_data.h b/src/core/transport/chttp2/frame_data.h
index 6762484e5b..bc32f29d97 100644
--- a/src/core/transport/chttp2/frame_data.h
+++ b/src/core/transport/chttp2/frame_data.h
@@ -39,7 +39,7 @@
#include "src/core/iomgr/exec_ctx.h"
#include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
-#include "src/core/transport/stream_op.h"
+#include "src/core/transport/byte_stream.h"
#include "src/core/transport/chttp2/frame.h"
typedef enum {
@@ -51,6 +51,14 @@ typedef enum {
GRPC_CHTTP2_DATA_FRAME
} grpc_chttp2_stream_state;
+typedef struct grpc_chttp2_incoming_byte_stream
+ grpc_chttp2_incoming_byte_stream;
+
+typedef struct grpc_chttp2_incoming_frame_queue {
+ grpc_chttp2_incoming_byte_stream *head;
+ grpc_chttp2_incoming_byte_stream *tail;
+} grpc_chttp2_incoming_frame_queue;
+
typedef struct {
grpc_chttp2_stream_state state;
gpr_uint8 is_last_frame;
@@ -58,9 +66,16 @@ typedef struct {
gpr_uint32 frame_size;
int is_frame_compressed;
- grpc_stream_op_buffer incoming_sopb;
+ grpc_chttp2_incoming_frame_queue incoming_frames;
+ grpc_chttp2_incoming_byte_stream *parsing_frame;
} grpc_chttp2_data_parser;
+void grpc_chttp2_incoming_frame_queue_merge(
+ grpc_chttp2_incoming_frame_queue *head_dst,
+ grpc_chttp2_incoming_frame_queue *tail_src);
+grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
+ grpc_chttp2_incoming_frame_queue *q);
+
/* initialize per-stream state for data frame parsing */
grpc_chttp2_parse_error grpc_chttp2_data_parser_init(
grpc_chttp2_data_parser *parser);
@@ -81,4 +96,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
/* create a slice with an empty data frame and is_last set */
gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id);
+void grpc_chttp2_encode_data(gpr_uint32 id, gpr_slice_buffer *inbuf,
+ gpr_uint32 write_bytes, int is_eof,
+ gpr_slice_buffer *outbuf);
+
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_DATA_H */
diff --git a/src/core/transport/chttp2/frame_window_update.c b/src/core/transport/chttp2/frame_window_update.c
index 91bbcfe2c1..74ca29baf9 100644
--- a/src/core/transport/chttp2/frame_window_update.c
+++ b/src/core/transport/chttp2/frame_window_update.c
@@ -89,7 +89,8 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse(
}
if (p->byte == 4) {
- if (p->amount == 0 || (p->amount & 0x80000000u)) {
+ gpr_uint32 received_update = p->amount;
+ if (received_update == 0 || (received_update & 0x80000000u)) {
gpr_log(GPR_ERROR, "invalid window update bytes: %d", p->amount);
return GRPC_CHTTP2_CONNECTION_ERROR;
}
@@ -97,17 +98,15 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse(
if (transport_parsing->incoming_stream_id != 0) {
if (stream_parsing != NULL) {
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("update", transport_parsing,
- stream_parsing, outgoing_window_update,
- p->amount);
- stream_parsing->outgoing_window_update += p->amount;
+ GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", transport_parsing,
+ stream_parsing, outgoing_window,
+ received_update);
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
stream_parsing);
}
} else {
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT("update", transport_parsing,
- outgoing_window_update, p->amount);
- transport_parsing->outgoing_window_update += p->amount;
+ GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parse", transport_parsing,
+ outgoing_window, received_update);
}
}
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/hpack_encoder.c
index 24a5d958b8..6c7c00b7c3 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/hpack_encoder.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/transport/chttp2/stream_encoder.h"
+#include "src/core/transport/chttp2/hpack_encoder.h"
#include <assert.h>
#include <string.h>
@@ -54,18 +54,13 @@
/* don't consider adding anything bigger than this to the hpack table */
#define MAX_DECODER_SPACE_USAGE 512
-/* what kind of frame our we encoding? */
-typedef enum { HEADER, DATA, NONE } frame_type;
-
typedef struct {
- frame_type cur_frame_type;
+ int is_first_frame;
/* number of bytes in 'output' when we started the frame - used to calculate
frame length */
size_t output_length_at_start_of_frame;
/* index (in output) of the header for the current frame */
size_t header_idx;
- /* was the last frame emitted a header? (if yes, we'll need a CONTINUATION */
- gpr_uint8 last_was_header;
/* have we seen a regular (non-colon-prefixed) header yet? */
gpr_uint8 seen_regular_header;
/* output stream id */
@@ -92,58 +87,35 @@ static void fill_header(gpr_uint8 *p, gpr_uint8 type, gpr_uint32 id, size_t len,
static void finish_frame(framer_state *st, int is_header_boundary,
int is_last_in_stream) {
gpr_uint8 type = 0xff;
- switch (st->cur_frame_type) {
- case HEADER:
- type = st->last_was_header ? GRPC_CHTTP2_FRAME_CONTINUATION
- : GRPC_CHTTP2_FRAME_HEADER;
- st->last_was_header = 1;
- break;
- case DATA:
- type = GRPC_CHTTP2_FRAME_DATA;
- st->last_was_header = 0;
- is_header_boundary = 0;
- break;
- case NONE:
- return;
- }
+ type = st->is_first_frame ? GRPC_CHTTP2_FRAME_HEADER
+ : GRPC_CHTTP2_FRAME_CONTINUATION;
fill_header(
GPR_SLICE_START_PTR(st->output->slices[st->header_idx]), type,
st->stream_id, st->output->length - st->output_length_at_start_of_frame,
(gpr_uint8)(
(is_last_in_stream ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0) |
(is_header_boundary ? GRPC_CHTTP2_DATA_FLAG_END_HEADERS : 0)));
- st->cur_frame_type = NONE;
+ st->is_first_frame = 0;
}
/* begin a new frame: reserve off header space, remember how many bytes we'd
output before beginning */
-static void begin_frame(framer_state *st, frame_type type) {
- GPR_ASSERT(type != NONE);
- GPR_ASSERT(st->cur_frame_type == NONE);
- st->cur_frame_type = type;
+static void begin_frame(framer_state *st) {
st->header_idx =
gpr_slice_buffer_add_indexed(st->output, gpr_slice_malloc(9));
st->output_length_at_start_of_frame = st->output->length;
}
-static void begin_new_frame(framer_state *st, frame_type type) {
- finish_frame(st, 1, 0);
- st->last_was_header = 0;
- begin_frame(st, type);
-}
-
/* make sure that the current frame is of the type desired, and has sufficient
space to add at least about_to_add bytes -- finishes the current frame if
needed */
-static void ensure_frame_type(framer_state *st, frame_type type,
- size_t need_bytes) {
- if (st->cur_frame_type == type &&
- st->output->length - st->output_length_at_start_of_frame + need_bytes <=
- GRPC_CHTTP2_MAX_PAYLOAD_LENGTH) {
+static void ensure_space(framer_state *st, size_t need_bytes) {
+ if (st->output->length - st->output_length_at_start_of_frame + need_bytes <=
+ GRPC_CHTTP2_MAX_PAYLOAD_LENGTH) {
return;
}
- finish_frame(st, type != HEADER, 0);
- begin_frame(st, type);
+ finish_frame(st, 0, 0);
+ begin_frame(st);
}
/* increment a filter count, halve all counts if one element reaches max */
@@ -165,31 +137,30 @@ static void add_header_data(framer_state *st, gpr_slice slice) {
size_t len = GPR_SLICE_LENGTH(slice);
size_t remaining;
if (len == 0) return;
- ensure_frame_type(st, HEADER, 1);
remaining = GRPC_CHTTP2_MAX_PAYLOAD_LENGTH +
st->output_length_at_start_of_frame - st->output->length;
if (len <= remaining) {
gpr_slice_buffer_add(st->output, slice);
} else {
gpr_slice_buffer_add(st->output, gpr_slice_split_head(&slice, remaining));
+ finish_frame(st, 0, 0);
+ begin_frame(st);
add_header_data(st, slice);
}
}
static gpr_uint8 *add_tiny_header_data(framer_state *st, size_t len) {
- ensure_frame_type(st, HEADER, len);
+ ensure_space(st, len);
return gpr_slice_buffer_tiny_add(st->output, len);
}
-/* add an element to the decoder table: returns metadata element to unref */
-static grpc_mdelem *add_elem(grpc_chttp2_hpack_compressor *c,
- grpc_mdelem *elem) {
+/* add an element to the decoder table */
+static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) {
gpr_uint32 key_hash = elem->key->hash;
gpr_uint32 elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash);
gpr_uint32 new_index = c->tail_remote_index + c->table_elems + 1;
size_t elem_size = 32 + GPR_SLICE_LENGTH(elem->key->slice) +
GPR_SLICE_LENGTH(elem->value->slice);
- grpc_mdelem *elem_to_unref;
GPR_ASSERT(elem_size < 65536);
@@ -220,31 +191,27 @@ static grpc_mdelem *add_elem(grpc_chttp2_hpack_compressor *c,
if (c->entries_elems[HASH_FRAGMENT_2(elem_hash)] == elem) {
/* already there: update with new index */
c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
- elem_to_unref = elem;
} else if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == elem) {
/* already there (cuckoo): update with new index */
c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
- elem_to_unref = elem;
} else if (c->entries_elems[HASH_FRAGMENT_2(elem_hash)] == NULL) {
/* not there, but a free element: add */
- c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = elem;
+ c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = GRPC_MDELEM_REF(elem);
c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
- elem_to_unref = NULL;
} else if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == NULL) {
/* not there (cuckoo), but a free element: add */
- c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = elem;
+ c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = GRPC_MDELEM_REF(elem);
c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
- elem_to_unref = NULL;
} else if (c->indices_elems[HASH_FRAGMENT_2(elem_hash)] <
c->indices_elems[HASH_FRAGMENT_3(elem_hash)]) {
/* not there: replace oldest */
- elem_to_unref = c->entries_elems[HASH_FRAGMENT_2(elem_hash)];
- c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = elem;
+ GRPC_MDELEM_UNREF(c->entries_elems[HASH_FRAGMENT_2(elem_hash)]);
+ c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = GRPC_MDELEM_REF(elem);
c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
} else {
/* not there: replace oldest */
- elem_to_unref = c->entries_elems[HASH_FRAGMENT_3(elem_hash)];
- c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = elem;
+ GRPC_MDELEM_UNREF(c->entries_elems[HASH_FRAGMENT_3(elem_hash)]);
+ c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = GRPC_MDELEM_REF(elem);
c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
}
@@ -270,8 +237,6 @@ static grpc_mdelem *add_elem(grpc_chttp2_hpack_compressor *c,
c->entries_keys[HASH_FRAGMENT_3(key_hash)] = GRPC_MDSTR_REF(elem->key);
c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index;
}
-
- return elem_to_unref;
}
static void emit_indexed(grpc_chttp2_hpack_compressor *c, gpr_uint32 elem_index,
@@ -370,9 +335,9 @@ static gpr_uint32 dynidx(grpc_chttp2_hpack_compressor *c,
c->table_elems - elem_index;
}
-/* encode an mdelem; returns metadata element to unref */
-static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c,
- grpc_mdelem *elem, framer_state *st) {
+/* encode an mdelem */
+static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
+ framer_state *st) {
gpr_uint32 key_hash = elem->key->hash;
gpr_uint32 elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash);
size_t decoder_space_usage;
@@ -397,7 +362,7 @@ static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c,
/* HIT: complete element (first cuckoo hash) */
emit_indexed(c, dynidx(c, c->indices_elems[HASH_FRAGMENT_2(elem_hash)]),
st);
- return elem;
+ return;
}
if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == elem &&
@@ -405,7 +370,7 @@ static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c,
/* HIT: complete element (second cuckoo hash) */
emit_indexed(c, dynidx(c, c->indices_elems[HASH_FRAGMENT_3(elem_hash)]),
st);
- return elem;
+ return;
}
/* should this elem be in the table? */
@@ -423,12 +388,13 @@ static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c,
/* HIT: key (first cuckoo hash) */
if (should_add_elem) {
emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st);
- return add_elem(c, elem);
+ add_elem(c, elem);
+ return;
} else {
emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st);
- return elem;
+ return;
}
- GPR_UNREACHABLE_CODE(return NULL);
+ GPR_UNREACHABLE_CODE(return );
}
indices_key = c->indices_keys[HASH_FRAGMENT_3(key_hash)];
@@ -437,24 +403,26 @@ static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c,
/* HIT: key (first cuckoo hash) */
if (should_add_elem) {
emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st);
- return add_elem(c, elem);
+ add_elem(c, elem);
+ return;
} else {
emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st);
- return elem;
+ return;
}
- GPR_UNREACHABLE_CODE(return NULL);
+ GPR_UNREACHABLE_CODE(return );
}
/* no elem, key in the table... fall back to literal emission */
if (should_add_elem) {
emit_lithdr_incidx_v(c, elem, st);
- return add_elem(c, elem);
+ add_elem(c, elem);
+ return;
} else {
emit_lithdr_noidx_v(c, elem, st);
- return elem;
+ return;
}
- GPR_UNREACHABLE_CODE(return NULL);
+ GPR_UNREACHABLE_CODE(return );
}
#define STRLEN_LIT(x) (sizeof(x) - 1)
@@ -469,8 +437,8 @@ static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline,
mdelem = grpc_mdelem_from_metadata_strings(
c->mdctx, GRPC_MDSTR_REF(c->timeout_key_str),
grpc_mdstr_from_string(c->mdctx, timeout_str));
- mdelem = hpack_enc(c, mdelem, st);
- if (mdelem) GRPC_MDELEM_UNREF(mdelem);
+ hpack_enc(c, mdelem, st);
+ GRPC_MDELEM_UNREF(mdelem);
}
gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id) {
@@ -495,169 +463,34 @@ void grpc_chttp2_hpack_compressor_destroy(grpc_chttp2_hpack_compressor *c) {
GRPC_MDSTR_UNREF(c->timeout_key_str);
}
-gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
- gpr_uint32 max_flow_controlled_bytes,
- grpc_stream_op_buffer *outops) {
- gpr_slice slice;
- grpc_stream_op *op;
- gpr_uint32 max_take_size;
- gpr_uint32 flow_controlled_bytes_taken = 0;
- gpr_uint32 curop = 0;
- gpr_uint8 *p;
- gpr_uint8 compressed_flag_set = 0;
-
- while (curop < *inops_count) {
- GPR_ASSERT(flow_controlled_bytes_taken <= max_flow_controlled_bytes);
- op = &inops[curop];
- switch (op->type) {
- case GRPC_NO_OP:
- /* skip */
- curop++;
- break;
- case GRPC_OP_METADATA:
- grpc_metadata_batch_assert_ok(&op->data.metadata);
- /* these just get copied as they don't impact the number of flow
- controlled bytes */
- grpc_sopb_append(outops, op, 1);
- curop++;
- break;
- case GRPC_OP_BEGIN_MESSAGE:
- /* begin op: for now we just convert the op to a slice and fall
- through - this lets us reuse the slice framing code below */
- compressed_flag_set =
- (op->data.begin_message.flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
- slice = gpr_slice_malloc(5);
-
- p = GPR_SLICE_START_PTR(slice);
- p[0] = compressed_flag_set;
- p[1] = (gpr_uint8)(op->data.begin_message.length >> 24);
- p[2] = (gpr_uint8)(op->data.begin_message.length >> 16);
- p[3] = (gpr_uint8)(op->data.begin_message.length >> 8);
- p[4] = (gpr_uint8)(op->data.begin_message.length);
- op->type = GRPC_OP_SLICE;
- op->data.slice = slice;
- /* fallthrough */
- case GRPC_OP_SLICE:
- slice = op->data.slice;
- if (!GPR_SLICE_LENGTH(slice)) {
- /* skip zero length slices */
- gpr_slice_unref(slice);
- curop++;
- break;
- }
- max_take_size = max_flow_controlled_bytes - flow_controlled_bytes_taken;
- if (max_take_size == 0) {
- goto exit_loop;
- }
- if (GPR_SLICE_LENGTH(slice) > max_take_size) {
- slice = gpr_slice_split_head(&op->data.slice, max_take_size);
- grpc_sopb_add_slice(outops, slice);
- } else {
- /* consume this op immediately */
- grpc_sopb_append(outops, op, 1);
- curop++;
- }
- flow_controlled_bytes_taken += (gpr_uint32)GPR_SLICE_LENGTH(slice);
- break;
- }
- }
-exit_loop:
- *inops_count -= curop;
- memmove(inops, inops + curop, *inops_count * sizeof(grpc_stream_op));
-
- for (curop = 0; curop < *inops_count; curop++) {
- if (inops[curop].type == GRPC_OP_METADATA) {
- grpc_metadata_batch_assert_ok(&inops[curop].data.metadata);
- }
- }
-
- return flow_controlled_bytes_taken;
-}
-
-void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
- gpr_uint32 stream_id,
- grpc_chttp2_hpack_compressor *compressor,
- gpr_slice_buffer *output) {
+void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c,
+ gpr_uint32 stream_id,
+ grpc_metadata_batch *metadata, int is_eof,
+ gpr_slice_buffer *outbuf) {
framer_state st;
- gpr_slice slice;
- grpc_stream_op *op;
- size_t max_take_size;
- gpr_uint32 curop = 0;
- gpr_uint32 unref_op;
grpc_linked_mdelem *l;
- int need_unref = 0;
gpr_timespec deadline;
GPR_ASSERT(stream_id != 0);
- st.cur_frame_type = NONE;
- st.last_was_header = 0;
st.seen_regular_header = 0;
st.stream_id = stream_id;
- st.output = output;
-
- while (curop < ops_count) {
- op = &ops[curop];
- switch (op->type) {
- case GRPC_NO_OP:
- case GRPC_OP_BEGIN_MESSAGE:
- gpr_log(
- GPR_ERROR,
- "These stream ops should be filtered out by grpc_chttp2_preencode");
- abort();
- case GRPC_OP_METADATA:
- /* Encode a metadata batch; store the returned values, representing
- a metadata element that needs to be unreffed back into the metadata
- slot. THIS MAY NOT BE THE SAME ELEMENT (if a decoder table slot got
- updated). After this loop, we'll do a batch unref of elements. */
- begin_new_frame(&st, HEADER);
- need_unref |= op->data.metadata.garbage.head != NULL;
- grpc_metadata_batch_assert_ok(&op->data.metadata);
- for (l = op->data.metadata.list.head; l; l = l->next) {
- l->md = hpack_enc(compressor, l->md, &st);
- need_unref |= l->md != NULL;
- }
- deadline = op->data.metadata.deadline;
- if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) != 0) {
- deadline_enc(compressor, deadline, &st);
- }
- curop++;
- break;
- case GRPC_OP_SLICE:
- slice = op->data.slice;
- if (st.cur_frame_type == DATA &&
- st.output->length - st.output_length_at_start_of_frame ==
- GRPC_CHTTP2_MAX_PAYLOAD_LENGTH) {
- finish_frame(&st, 0, 0);
- }
- ensure_frame_type(&st, DATA, 1);
- max_take_size = GRPC_CHTTP2_MAX_PAYLOAD_LENGTH +
- st.output_length_at_start_of_frame - st.output->length;
- if (GPR_SLICE_LENGTH(slice) > max_take_size) {
- slice = gpr_slice_split_head(&op->data.slice, max_take_size);
- } else {
- /* consume this op immediately */
- curop++;
- }
- gpr_slice_buffer_add(output, slice);
- break;
- }
+ st.output = outbuf;
+ st.is_first_frame = 1;
+
+ /* Encode a metadata batch; store the returned values, representing
+ a metadata element that needs to be unreffed back into the metadata
+ slot. THIS MAY NOT BE THE SAME ELEMENT (if a decoder table slot got
+ updated). After this loop, we'll do a batch unref of elements. */
+ begin_frame(&st);
+ grpc_metadata_batch_assert_ok(metadata);
+ for (l = metadata->list.head; l; l = l->next) {
+ hpack_enc(c, l->md, &st);
}
- if (eof && st.cur_frame_type == NONE) {
- begin_frame(&st, DATA);
- }
- finish_frame(&st, 1, eof);
-
- if (need_unref) {
- for (unref_op = 0; unref_op < curop; unref_op++) {
- op = &ops[unref_op];
- if (op->type != GRPC_OP_METADATA) continue;
- for (l = op->data.metadata.list.head; l; l = l->next) {
- if (l->md) GRPC_MDELEM_UNREF(l->md);
- }
- for (l = op->data.metadata.garbage.head; l; l = l->next) {
- GRPC_MDELEM_UNREF(l->md);
- }
- }
+ deadline = metadata->deadline;
+ if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) != 0) {
+ deadline_enc(c, deadline, &st);
}
+
+ finish_frame(&st, 1, is_eof);
}
diff --git a/src/core/transport/chttp2/stream_encoder.h b/src/core/transport/chttp2/hpack_encoder.h
index db52f2a0f6..59b122dfda 100644
--- a/src/core/transport/chttp2/stream_encoder.h
+++ b/src/core/transport/chttp2/hpack_encoder.h
@@ -31,12 +31,12 @@
*
*/
-#ifndef GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_STREAM_ENCODER_H
-#define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_STREAM_ENCODER_H
+#ifndef GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_HPACK_ENCODER_H
+#define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_HPACK_ENCODER_H
#include "src/core/transport/chttp2/frame.h"
#include "src/core/transport/metadata.h"
-#include "src/core/transport/stream_op.h"
+#include "src/core/transport/metadata_batch.h"
#include <grpc/support/port_platform.h>
#include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
@@ -78,16 +78,8 @@ void grpc_chttp2_hpack_compressor_init(grpc_chttp2_hpack_compressor *c,
grpc_mdctx *mdctx);
void grpc_chttp2_hpack_compressor_destroy(grpc_chttp2_hpack_compressor *c);
-/* select stream ops to be encoded, moving them from inops to outops, and
- moving subsequent ops in inops forward in the queue */
-gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
- gpr_uint32 max_flow_controlled_bytes,
- grpc_stream_op_buffer *outops);
+void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c, gpr_uint32 id,
+ grpc_metadata_batch *metadata, int is_eof,
+ gpr_slice_buffer *outbuf);
-/* encode stream ops to output */
-void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
- gpr_uint32 stream_id,
- grpc_chttp2_hpack_compressor *compressor,
- gpr_slice_buffer *output);
-
-#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_STREAM_ENCODER_H */
+#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_HPACK_ENCODER_H */
diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c
index 20d8312d54..6eebfc3ce4 100644
--- a/src/core/transport/chttp2/hpack_parser.c
+++ b/src/core/transport/chttp2/hpack_parser.c
@@ -38,13 +38,15 @@
#include <string.h>
#include <assert.h>
-#include "src/core/transport/chttp2/bin_encoder.h"
-#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/useful.h>
+#include "src/core/profiling/timers.h"
+#include "src/core/support/string.h"
+#include "src/core/transport/chttp2/bin_encoder.h"
+
typedef enum {
NOT_BINARY,
B64_BYTE0,
@@ -1379,20 +1381,23 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) {
grpc_chttp2_hpack_parser *parser = hpack_parser;
+ GPR_TIMER_BEGIN("grpc_chttp2_hpack_parser_parse", 0);
if (!grpc_chttp2_hpack_parser_parse(parser, GPR_SLICE_START_PTR(slice),
GPR_SLICE_END_PTR(slice))) {
+ GPR_TIMER_END("grpc_chttp2_hpack_parser_parse", 0);
return GRPC_CHTTP2_CONNECTION_ERROR;
}
if (is_last) {
if (parser->is_boundary && parser->state != parse_begin) {
gpr_log(GPR_ERROR,
"end of header frame not aligned with a hpack record boundary");
+ GPR_TIMER_END("grpc_chttp2_hpack_parser_parse", 0);
return GRPC_CHTTP2_CONNECTION_ERROR;
}
if (parser->is_boundary) {
- grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
- &stream_parsing->incoming_metadata,
- &stream_parsing->data_parser.incoming_sopb);
+ stream_parsing
+ ->got_metadata_on_parse[stream_parsing->header_frames_received] = 1;
+ stream_parsing->header_frames_received++;
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing,
stream_parsing);
}
@@ -1404,5 +1409,6 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
parser->is_boundary = 0xde;
parser->is_eof = 0xde;
}
+ GPR_TIMER_END("grpc_chttp2_hpack_parser_parse", 0);
return GRPC_CHTTP2_PARSE_OK;
}
diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c
index 10c64f3356..956afc8e9d 100644
--- a/src/core/transport/chttp2/incoming_metadata.c
+++ b/src/core/transport/chttp2/incoming_metadata.c
@@ -48,14 +48,27 @@ void grpc_chttp2_incoming_metadata_buffer_init(
void grpc_chttp2_incoming_metadata_buffer_destroy(
grpc_chttp2_incoming_metadata_buffer *buffer) {
size_t i;
+ if (!buffer->published) {
+ for (i = 0; i < buffer->count; i++) {
+ GRPC_MDELEM_UNREF(buffer->elems[i].md);
+ }
+ }
+ gpr_free(buffer->elems);
+}
+
+void grpc_chttp2_incoming_metadata_buffer_reset(
+ grpc_chttp2_incoming_metadata_buffer *buffer) {
+ size_t i;
+ GPR_ASSERT(!buffer->published);
for (i = 0; i < buffer->count; i++) {
GRPC_MDELEM_UNREF(buffer->elems[i].md);
}
- gpr_free(buffer->elems);
+ buffer->count = 0;
}
void grpc_chttp2_incoming_metadata_buffer_add(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem *elem) {
+ GPR_ASSERT(!buffer->published);
if (buffer->capacity == buffer->count) {
buffer->capacity = GPR_MAX(8, 2 * buffer->capacity);
buffer->elems =
@@ -66,117 +79,36 @@ void grpc_chttp2_incoming_metadata_buffer_add(
void grpc_chttp2_incoming_metadata_buffer_set_deadline(
grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) {
+ GPR_ASSERT(!buffer->published);
buffer->deadline = deadline;
}
-void grpc_chttp2_incoming_metadata_live_op_buffer_end(
- grpc_chttp2_incoming_metadata_live_op_buffer *buffer) {
- gpr_free(buffer->elems);
- buffer->elems = NULL;
-}
-
-void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
- grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb) {
- grpc_metadata_batch b;
-
- b.list.head = NULL;
- /* Store away the last element of the list, so that in patch_metadata_ops
- we can reconstitute the list.
- We can't do list building here as later incoming metadata may reallocate
- the underlying array. */
- b.list.tail = (void *)(gpr_intptr)buffer->count;
- b.garbage.head = b.garbage.tail = NULL;
- b.deadline = buffer->deadline;
- buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
-
- grpc_sopb_add_metadata(sopb, b);
-}
-
void grpc_chttp2_incoming_metadata_buffer_swap(
grpc_chttp2_incoming_metadata_buffer *a,
grpc_chttp2_incoming_metadata_buffer *b) {
+ GPR_ASSERT(!a->published);
+ GPR_ASSERT(!b->published);
GPR_SWAP(grpc_chttp2_incoming_metadata_buffer, *a, *b);
}
-void grpc_incoming_metadata_buffer_move_to_referencing_sopb(
- grpc_chttp2_incoming_metadata_buffer *src,
- grpc_chttp2_incoming_metadata_buffer *dst, grpc_stream_op_buffer *sopb) {
- size_t delta;
- size_t i;
- dst->deadline = gpr_time_min(src->deadline, dst->deadline);
-
- if (src->count == 0) {
- return;
- }
- if (dst->count == 0) {
- grpc_chttp2_incoming_metadata_buffer_swap(src, dst);
- return;
- }
- delta = dst->count;
- if (dst->capacity < src->count + dst->count) {
- dst->capacity = GPR_MAX(dst->capacity * 2, src->count + dst->count);
- dst->elems = gpr_realloc(dst->elems, dst->capacity * sizeof(*dst->elems));
- }
- memcpy(dst->elems + dst->count, src->elems, src->count * sizeof(*src->elems));
- dst->count += src->count;
- for (i = 0; i < sopb->nops; i++) {
- if (sopb->ops[i].type != GRPC_OP_METADATA) continue;
- sopb->ops[i].data.metadata.list.tail =
- (void *)(delta + (gpr_uintptr)sopb->ops[i].data.metadata.list.tail);
- }
- src->count = 0;
-}
-
-void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(
- grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb,
- grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer) {
- grpc_stream_op *ops = sopb->ops;
- size_t nops = sopb->nops;
- size_t i;
- size_t j;
- size_t mdidx = 0;
- size_t last_mdidx;
- int found_metadata = 0;
-
- /* rework the array of metadata into a linked list, making use
- of the breadcrumbs we left in metadata batches during
- add_metadata_batch */
- for (i = 0; i < nops; i++) {
- grpc_stream_op *op = &ops[i];
- if (op->type != GRPC_OP_METADATA) continue;
- found_metadata = 1;
- /* we left a breadcrumb indicating where the end of this list is,
- and since we add sequentially, we know from the end of the last
- segment where this segment begins */
- last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail);
- GPR_ASSERT(last_mdidx > mdidx);
- GPR_ASSERT(last_mdidx <= buffer->count);
- /* turn the array into a doubly linked list */
- op->data.metadata.list.head = &buffer->elems[mdidx];
- op->data.metadata.list.tail = &buffer->elems[last_mdidx - 1];
- for (j = mdidx + 1; j < last_mdidx; j++) {
- buffer->elems[j].prev = &buffer->elems[j - 1];
- buffer->elems[j - 1].next = &buffer->elems[j];
+void grpc_chttp2_incoming_metadata_buffer_publish(
+ grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch) {
+ GPR_ASSERT(!buffer->published);
+ buffer->published = 1;
+ if (buffer->count > 0) {
+ size_t i;
+ for (i = 1; i < buffer->count; i++) {
+ buffer->elems[i].prev = &buffer->elems[i - 1];
}
- buffer->elems[mdidx].prev = NULL;
- buffer->elems[last_mdidx - 1].next = NULL;
- /* track where we're up to */
- mdidx = last_mdidx;
- }
- if (found_metadata) {
- live_op_buffer->elems = buffer->elems;
- if (mdidx != buffer->count) {
- /* we have a partially read metadata batch still in incoming_metadata */
- size_t new_count = buffer->count - mdidx;
- size_t copy_bytes = sizeof(*buffer->elems) * new_count;
- GPR_ASSERT(mdidx < buffer->count);
- buffer->elems = gpr_malloc(copy_bytes);
- memcpy(buffer->elems, live_op_buffer->elems + mdidx, copy_bytes);
- buffer->count = buffer->capacity = new_count;
- } else {
- buffer->elems = NULL;
- buffer->count = 0;
- buffer->capacity = 0;
+ for (i = 0; i < buffer->count - 1; i++) {
+ buffer->elems[i].next = &buffer->elems[i + 1];
}
+ buffer->elems[0].prev = NULL;
+ buffer->elems[buffer->count - 1].next = NULL;
+ batch->list.head = &buffer->elems[0];
+ batch->list.tail = &buffer->elems[buffer->count - 1];
+ } else {
+ batch->list.head = batch->list.tail = NULL;
}
+ batch->deadline = buffer->deadline;
}
diff --git a/src/core/transport/chttp2/incoming_metadata.h b/src/core/transport/chttp2/incoming_metadata.h
index 2f1de411ba..0e1dabe825 100644
--- a/src/core/transport/chttp2/incoming_metadata.h
+++ b/src/core/transport/chttp2/incoming_metadata.h
@@ -41,12 +41,9 @@ typedef struct {
size_t count;
size_t capacity;
gpr_timespec deadline;
+ int published;
} grpc_chttp2_incoming_metadata_buffer;
-typedef struct {
- grpc_linked_mdelem *elems;
-} grpc_chttp2_incoming_metadata_live_op_buffer;
-
/** assumes everything initially zeroed */
void grpc_chttp2_incoming_metadata_buffer_init(
grpc_chttp2_incoming_metadata_buffer *buffer);
@@ -54,27 +51,12 @@ void grpc_chttp2_incoming_metadata_buffer_destroy(
grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_reset(
grpc_chttp2_incoming_metadata_buffer *buffer);
+void grpc_chttp2_incoming_metadata_buffer_publish(
+ grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch);
void grpc_chttp2_incoming_metadata_buffer_add(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem *elem);
void grpc_chttp2_incoming_metadata_buffer_set_deadline(
grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline);
-/** extend sopb with a metadata batch; this must be post-processed by
- grpc_chttp2_incoming_metadata_buffer_postprocess_sopb before being handed
- out of the transport */
-void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
- grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb);
-
-void grpc_incoming_metadata_buffer_move_to_referencing_sopb(
- grpc_chttp2_incoming_metadata_buffer *src,
- grpc_chttp2_incoming_metadata_buffer *dst, grpc_stream_op_buffer *sopb);
-
-void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(
- grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb,
- grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer);
-
-void grpc_chttp2_incoming_metadata_live_op_buffer_end(
- grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer);
-
#endif /* GRPC_INTERNAL_CORE_CHTTP2_INCOMING_METADATA_H */
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index b35f8b5d88..b53c9dee0b 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -34,6 +34,8 @@
#ifndef GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H
#define GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H
+#include <assert.h>
+
#include "src/core/iomgr/endpoint.h"
#include "src/core/transport/chttp2/frame.h"
#include "src/core/transport/chttp2/frame_data.h"
@@ -42,9 +44,9 @@
#include "src/core/transport/chttp2/frame_rst_stream.h"
#include "src/core/transport/chttp2/frame_settings.h"
#include "src/core/transport/chttp2/frame_window_update.h"
+#include "src/core/transport/chttp2/hpack_encoder.h"
#include "src/core/transport/chttp2/hpack_parser.h"
#include "src/core/transport/chttp2/incoming_metadata.h"
-#include "src/core/transport/chttp2/stream_encoder.h"
#include "src/core/transport/chttp2/stream_map.h"
#include "src/core/transport/connectivity_state.h"
#include "src/core/transport/transport_impl.h"
@@ -56,14 +58,14 @@ typedef struct grpc_chttp2_stream grpc_chttp2_stream;
happen to them... this enum labels each list */
typedef enum {
GRPC_CHTTP2_LIST_ALL_STREAMS,
- GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED,
+ GRPC_CHTTP2_LIST_CHECK_READ_OPS,
+ GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE,
GRPC_CHTTP2_LIST_WRITABLE,
GRPC_CHTTP2_LIST_WRITING,
GRPC_CHTTP2_LIST_WRITTEN,
GRPC_CHTTP2_LIST_PARSING_SEEN,
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING,
- GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING,
- GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED,
+ GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
/** streams that are waiting to start because there are too many concurrent
streams on the connection */
GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
@@ -113,22 +115,6 @@ typedef enum {
GRPC_DTS_FRAME
} grpc_chttp2_deframe_transport_state;
-typedef enum {
- GRPC_WRITE_STATE_OPEN,
- GRPC_WRITE_STATE_QUEUED_CLOSE,
- GRPC_WRITE_STATE_SENT_CLOSE
-} grpc_chttp2_write_state;
-
-/* flags that can be or'd into stream_global::writing_now */
-#define GRPC_CHTTP2_WRITING_DATA 1
-#define GRPC_CHTTP2_WRITING_WINDOW 2
-
-typedef enum {
- GRPC_DONT_SEND_CLOSED = 0,
- GRPC_SEND_CLOSED,
- GRPC_SEND_CLOSED_WITH_RST_STREAM
-} grpc_chttp2_send_closed;
-
typedef struct {
grpc_chttp2_stream *head;
grpc_chttp2_stream *tail;
@@ -160,14 +146,28 @@ typedef struct grpc_chttp2_outstanding_ping {
struct grpc_chttp2_outstanding_ping *prev;
} grpc_chttp2_outstanding_ping;
+/* forward declared in frame_data.h */
+struct grpc_chttp2_incoming_byte_stream {
+ grpc_byte_stream base;
+ gpr_refcount refs;
+ struct grpc_chttp2_incoming_byte_stream *next_message;
+
+ grpc_chttp2_transport *transport;
+ grpc_chttp2_stream *stream;
+ int is_tail;
+ gpr_slice_buffer slices;
+ grpc_closure *on_next;
+ gpr_slice *next;
+};
+
typedef struct {
/** data to write next write */
gpr_slice_buffer qbuf;
/** window available for us to send to peer */
gpr_int64 outgoing_window;
- /** window available for peer to send to us - updated after parse */
- gpr_uint32 incoming_window;
+ /** window available to announce to peer */
+ gpr_int64 announce_incoming_window;
/** how much window would we like to have for incoming_window */
gpr_uint32 connection_window_target;
@@ -209,6 +209,7 @@ typedef struct {
gpr_slice_buffer outbuf;
/** hpack encoding */
grpc_chttp2_hpack_compressor hpack_compressor;
+ gpr_int64 outgoing_window;
/** is this a client? */
gpr_uint8 is_client;
/** callback for when writing is done */
@@ -233,6 +234,7 @@ struct grpc_chttp2_transport_parsing {
gpr_slice_buffer qbuf;
/* metadata object cache */
grpc_mdstr *str_grpc_timeout;
+ grpc_mdelem *elem_grpc_status_ok;
/** parser for headers */
grpc_chttp2_hpack_parser hpack_parser;
/** simple one shot parsers */
@@ -246,8 +248,7 @@ struct grpc_chttp2_transport_parsing {
grpc_chttp2_goaway_parser goaway_parser;
/** window available for peer to send to us */
- gpr_uint32 incoming_window;
- gpr_uint32 incoming_window_delta;
+ gpr_int64 incoming_window;
/** next stream id available at the time of beginning parsing */
gpr_uint32 next_stream_id;
@@ -278,7 +279,7 @@ struct grpc_chttp2_transport_parsing {
gpr_uint32 goaway_last_stream_index;
gpr_slice goaway_text;
- gpr_int64 outgoing_window_update;
+ gpr_int64 outgoing_window;
/** pings awaiting responses */
grpc_chttp2_outstanding_ping pings;
@@ -345,8 +346,8 @@ struct grpc_chttp2_transport {
struct {
/* accept stream callback */
- void (*accept_stream)(void *user_data, grpc_transport *transport,
- const void *server_data);
+ void (*accept_stream)(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_transport *transport, const void *server_data);
void *accept_stream_user_data;
/** connectivity tracking */
@@ -358,9 +359,6 @@ typedef struct {
/** HTTP2 stream id for this stream, or zero if one has not been assigned */
gpr_uint32 id;
- grpc_closure *send_done_closure;
- grpc_closure *recv_done_closure;
-
/** window available for us to send to peer */
gpr_int64 outgoing_window;
/** The number of bytes the upper layers have offered to receive.
@@ -371,54 +369,66 @@ typedef struct {
not yet announced to HTTP2 flow control.
As the upper layers offer to read more bytes, this value increases.
As we advertise incoming flow control window, this value decreases. */
- gpr_uint32 unannounced_incoming_window;
- /** The number of bytes of HTTP2 flow control we have advertised.
- As we advertise incoming flow control window, this value increases.
- As bytes are read, this value decreases.
- Updated after parse. */
- gpr_uint32 incoming_window;
- /** stream ops the transport user would like to send */
- grpc_stream_op_buffer *outgoing_sopb;
+ gpr_uint32 unannounced_incoming_window_for_parse;
+ gpr_uint32 unannounced_incoming_window_for_writing;
+ /** things the upper layers would like to send */
+ grpc_metadata_batch *send_initial_metadata;
+ grpc_closure *send_initial_metadata_finished;
+ grpc_byte_stream *send_message;
+ grpc_closure *send_message_finished;
+ grpc_metadata_batch *send_trailing_metadata;
+ grpc_closure *send_trailing_metadata_finished;
+
+ grpc_metadata_batch *recv_initial_metadata;
+ grpc_closure *recv_initial_metadata_finished;
+ grpc_byte_stream **recv_message;
+ grpc_closure *recv_message_ready;
+ grpc_metadata_batch *recv_trailing_metadata;
+ grpc_closure *recv_trailing_metadata_finished;
+
/** when the application requests writes be closed, the write_closed is
'queued'; when the close is flow controlled into the send path, we are
'sending' it; when the write has been performed it is 'sent' */
- grpc_chttp2_write_state write_state;
- /** is this stream closed (boolean) */
+ gpr_uint8 write_closed;
+ /** is this stream reading half-closed (boolean) */
gpr_uint8 read_closed;
- /** has this stream been cancelled? (boolean) */
- gpr_uint8 cancelled;
- grpc_status_code cancelled_status;
- /** have we told the upper layer that this stream is cancelled? */
- gpr_uint8 published_cancelled;
+ /** is this stream finished closing (and reportably closed) */
+ gpr_uint8 finished_close;
/** is this stream in the stream map? (boolean) */
gpr_uint8 in_stream_map;
- /** bitmask of GRPC_CHTTP2_WRITING_xxx above */
- gpr_uint8 writing_now;
- /** has anything been written to this stream? */
- gpr_uint8 written_anything;
-
- /** stream state already published to the upper layer */
- grpc_stream_state published_state;
- /** address to publish next stream state to */
- grpc_stream_state *publish_state;
- /** pointer to sop buffer to fill in with new stream ops */
- grpc_stream_op_buffer *publish_sopb;
- grpc_stream_op_buffer incoming_sopb;
+ /** has this stream seen an error? if 1, then pending incoming frames
+ can be thrown away */
+ gpr_uint8 seen_error;
- /** incoming metadata */
- grpc_chttp2_incoming_metadata_buffer incoming_metadata;
- grpc_chttp2_incoming_metadata_live_op_buffer outstanding_metadata;
+ gpr_uint8 published_initial_metadata;
+ gpr_uint8 published_trailing_metadata;
+ gpr_uint8 faked_trailing_metadata;
+
+ grpc_chttp2_incoming_metadata_buffer received_initial_metadata;
+ grpc_chttp2_incoming_metadata_buffer received_trailing_metadata;
+
+ grpc_chttp2_incoming_frame_queue incoming_frames;
} grpc_chttp2_stream_global;
typedef struct {
/** HTTP2 stream id for this stream, or zero if one has not been assigned */
gpr_uint32 id;
- /** sops that have passed flow control to be written */
- grpc_stream_op_buffer sopb;
- /** how strongly should we indicate closure with the next write */
- grpc_chttp2_send_closed send_closed;
+ gpr_uint8 fetching;
+ gpr_uint8 sent_initial_metadata;
+ gpr_uint8 sent_message;
+ gpr_uint8 sent_trailing_metadata;
+ gpr_uint8 read_closed;
+ /** send this initial metadata */
+ grpc_metadata_batch *send_initial_metadata;
+ grpc_byte_stream *send_message;
+ grpc_metadata_batch *send_trailing_metadata;
+ gpr_int64 outgoing_window;
/** how much window should we announce? */
gpr_uint32 announce_window;
+ gpr_slice_buffer flow_controlled_buffer;
+ gpr_slice fetching_slice;
+ size_t stream_fetched;
+ grpc_closure finished_fetch;
} grpc_chttp2_stream_writing;
struct grpc_chttp2_stream_parsing {
@@ -428,22 +438,29 @@ struct grpc_chttp2_stream_parsing {
gpr_uint8 received_close;
/** saw a rst_stream */
gpr_uint8 saw_rst_stream;
- /** incoming_window has been reduced by this much during parsing */
- gpr_uint32 incoming_window_delta;
+ /** how many header frames have we received? */
+ gpr_uint8 header_frames_received;
+ /** which metadata did we get (on this parse) */
+ gpr_uint8 got_metadata_on_parse[2];
+ /** should we raise the seen_error flag in transport_global */
+ gpr_uint8 seen_error;
/** window available for peer to send to us */
- gpr_uint32 incoming_window;
+ gpr_int64 incoming_window;
/** parsing state for data frames */
grpc_chttp2_data_parser data_parser;
/** reason give to rst_stream */
gpr_uint32 rst_stream_reason;
- /* amount of window given */
- gpr_uint64 outgoing_window_update;
+ /** amount of window given */
+ gpr_int64 outgoing_window;
+ /** number of bytes received - reset at end of parse thread execution */
+ gpr_int64 received_bytes;
/** incoming metadata */
- grpc_chttp2_incoming_metadata_buffer incoming_metadata;
+ grpc_chttp2_incoming_metadata_buffer metadata_buffer[2];
};
struct grpc_chttp2_stream {
+ grpc_stream_refcount *refcount;
grpc_chttp2_stream_global global;
grpc_chttp2_stream_writing writing;
grpc_chttp2_stream_parsing parsing;
@@ -504,21 +521,10 @@ void grpc_chttp2_list_remove_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
-void grpc_chttp2_list_add_incoming_window_updated(
- grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global);
-int grpc_chttp2_list_pop_incoming_window_updated(
- grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_global **stream_global,
- grpc_chttp2_stream_parsing **stream_parsing);
-void grpc_chttp2_list_remove_incoming_window_updated(
- grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global);
-
-void grpc_chttp2_list_add_writing_stream(
+/* returns 1 if stream added, 0 if it was already present */
+int grpc_chttp2_list_add_writing_stream(
grpc_chttp2_transport_writing *transport_writing,
- grpc_chttp2_stream_writing *stream_writing);
+ grpc_chttp2_stream_writing *stream_writing) GRPC_MUST_USE_RESULT;
int grpc_chttp2_list_have_writing_streams(
grpc_chttp2_transport_writing *transport_writing);
int grpc_chttp2_list_pop_writing_stream(
@@ -550,31 +556,44 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
-void grpc_chttp2_list_add_closed_waiting_for_parsing(
+void grpc_chttp2_list_add_check_read_ops(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
-int grpc_chttp2_list_pop_closed_waiting_for_parsing(
+int grpc_chttp2_list_pop_check_read_ops(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
-void grpc_chttp2_list_add_cancelled_waiting_for_writing(
+void grpc_chttp2_list_add_stalled_by_transport(
+ grpc_chttp2_transport_writing *transport_writing,
+ grpc_chttp2_stream_writing *stream_writing);
+int grpc_chttp2_list_pop_stalled_by_transport(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global **stream_global);
+
+void grpc_chttp2_list_add_unannounced_incoming_window_available(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
-int grpc_chttp2_list_pop_cancelled_waiting_for_writing(
+void grpc_chttp2_list_remove_unannounced_incoming_window_available(
grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global **stream_global);
+ grpc_chttp2_stream_global *stream_global);
+int grpc_chttp2_list_pop_unannounced_incoming_window_available(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_transport_parsing *transport_parsing,
+ grpc_chttp2_stream_global **stream_global,
+ grpc_chttp2_stream_parsing **stream_parsing);
-void grpc_chttp2_list_add_read_write_state_changed(
+void grpc_chttp2_list_add_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
-int grpc_chttp2_list_pop_read_write_state_changed(
+int grpc_chttp2_list_pop_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
- grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
+ gpr_uint32 id);
void grpc_chttp2_add_incoming_goaway(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
@@ -592,7 +611,10 @@ void grpc_chttp2_for_all_streams(
grpc_chttp2_stream_global *stream_global));
void grpc_chttp2_parsing_become_skip_parser(
- grpc_chttp2_transport_parsing *transport_parsing);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
+
+void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
+ grpc_closure **pclosure, int success);
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
@@ -607,26 +629,122 @@ extern int grpc_flowctl_trace;
else \
stmt
-#define GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(reason, transport, context, var, \
- delta) \
- if (!(grpc_flowctl_trace)) { \
- } else { \
- grpc_chttp2_flowctl_trace(__FILE__, __LINE__, reason, #context, #var, \
- transport->is_client, context->id, \
- (gpr_int64)(context->var), (gpr_int64)(delta)); \
- }
-
-#define GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(reason, context, var, delta) \
- if (!(grpc_flowctl_trace)) { \
- } else { \
- grpc_chttp2_flowctl_trace(__FILE__, __LINE__, reason, #context, #var, \
- context->is_client, 0, \
- (gpr_int64)(context->var), (gpr_int64)(delta)); \
- }
-
-void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason,
- const char *context, const char *var,
- int is_client, gpr_uint32 stream_id,
- gpr_int64 current_value, gpr_int64 delta);
+typedef enum {
+ GRPC_CHTTP2_FLOWCTL_MOVE,
+ GRPC_CHTTP2_FLOWCTL_CREDIT,
+ GRPC_CHTTP2_FLOWCTL_DEBIT
+} grpc_chttp2_flowctl_op;
+
+#define GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, transport, id1, id2, dst_context, \
+ dst_var, src_context, src_var) \
+ do { \
+ assert(id1 == id2); \
+ if (grpc_flowctl_trace) { \
+ grpc_chttp2_flowctl_trace( \
+ __FILE__, __LINE__, phase, GRPC_CHTTP2_FLOWCTL_MOVE, #dst_context, \
+ #dst_var, #src_context, #src_var, transport->is_client, id1, \
+ dst_context->dst_var, src_context->src_var); \
+ } \
+ dst_context->dst_var += src_context->src_var; \
+ src_context->src_var = 0; \
+ } while (0)
+
+#define GRPC_CHTTP2_FLOW_MOVE_STREAM(phase, transport, dst_context, dst_var, \
+ src_context, src_var) \
+ GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, transport, dst_context->id, \
+ src_context->id, dst_context, dst_var, \
+ src_context, src_var)
+#define GRPC_CHTTP2_FLOW_MOVE_TRANSPORT(phase, dst_context, dst_var, \
+ src_context, src_var) \
+ GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, dst_context, 0, 0, dst_context, dst_var, \
+ src_context, src_var)
+
+#define GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, transport, id, dst_context, \
+ dst_var, amount) \
+ do { \
+ if (grpc_flowctl_trace) { \
+ grpc_chttp2_flowctl_trace(__FILE__, __LINE__, phase, \
+ GRPC_CHTTP2_FLOWCTL_CREDIT, #dst_context, \
+ #dst_var, NULL, #amount, transport->is_client, \
+ id, dst_context->dst_var, amount); \
+ } \
+ dst_context->dst_var += amount; \
+ } while (0)
+
+#define GRPC_CHTTP2_FLOW_CREDIT_STREAM(phase, transport, dst_context, dst_var, \
+ amount) \
+ GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, transport, dst_context->id, \
+ dst_context, dst_var, amount)
+#define GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT(phase, dst_context, dst_var, amount) \
+ GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \
+ amount)
+
+#define GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, id, dst_context, \
+ dst_var, amount) \
+ do { \
+ if (grpc_flowctl_trace) { \
+ grpc_chttp2_flowctl_trace(__FILE__, __LINE__, phase, \
+ GRPC_CHTTP2_FLOWCTL_DEBIT, #dst_context, \
+ #dst_var, NULL, #amount, transport->is_client, \
+ id, dst_context->dst_var, amount); \
+ } \
+ dst_context->dst_var -= amount; \
+ } while (0)
+
+#define GRPC_CHTTP2_FLOW_DEBIT_STREAM(phase, transport, dst_context, dst_var, \
+ amount) \
+ GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, dst_context->id, \
+ dst_context, dst_var, amount)
+#define GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT(phase, dst_context, dst_var, amount) \
+ GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \
+ amount)
+
+void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
+ grpc_chttp2_flowctl_op op, const char *context1,
+ const char *var1, const char *context2,
+ const char *var2, int is_client,
+ gpr_uint32 stream_id, gpr_int64 val1,
+ gpr_int64 val2);
+
+void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream,
+ grpc_status_code status, gpr_slice *details);
+void grpc_chttp2_mark_stream_closed(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global, int close_reads,
+ int close_writes);
+void grpc_chttp2_start_writing(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *transport_global);
+
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+#define GRPC_CHTTP2_STREAM_REF(stream_global, reason) \
+ grpc_chttp2_stream_ref(stream_global, reason)
+#define GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, reason) \
+ grpc_chttp2_stream_unref(exec_ctx, stream_global, reason)
+void grpc_chttp2_stream_ref(grpc_chttp2_stream_global *stream_global,
+ const char *reason);
+void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_stream_global *stream_global,
+ const char *reason);
+#else
+#define GRPC_CHTTP2_STREAM_REF(stream_global, reason) \
+ grpc_chttp2_stream_ref(stream_global)
+#define GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, reason) \
+ grpc_chttp2_stream_unref(exec_ctx, stream_global)
+void grpc_chttp2_stream_ref(grpc_chttp2_stream_global *stream_global);
+void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_stream_global *stream_global);
+#endif
+
+grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
+ grpc_chttp2_transport_parsing *transport_parsing,
+ grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size,
+ gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue);
+void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_incoming_byte_stream *bs,
+ gpr_slice slice);
+void grpc_chttp2_incoming_byte_stream_finished(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs);
#endif
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c
index 5d4d8e70c4..8cef8fbb77 100644
--- a/src/core/transport/chttp2/parsing.c
+++ b/src/core/transport/chttp2/parsing.c
@@ -42,22 +42,28 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
-static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing);
+static int init_frame_parser(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_parsing *transport_parsing);
static int init_header_frame_parser(
- grpc_chttp2_transport_parsing *transport_parsing, int is_continuation);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
+ int is_continuation);
static int init_data_frame_parser(
- grpc_chttp2_transport_parsing *transport_parsing);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
static int init_rst_stream_parser(
- grpc_chttp2_transport_parsing *transport_parsing);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
static int init_settings_frame_parser(
- grpc_chttp2_transport_parsing *transport_parsing);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
static int init_window_update_frame_parser(
- grpc_chttp2_transport_parsing *transport_parsing);
-static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing);
-static int init_goaway_parser(grpc_chttp2_transport_parsing *transport_parsing);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
+static int init_ping_parser(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_parsing *transport_parsing);
+static int init_goaway_parser(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_parsing *transport_parsing);
static int init_skip_frame_parser(
- grpc_chttp2_transport_parsing *transport_parsing, int is_header);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
+ int is_header);
static int parse_frame_slice(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_parsing *transport_parsing,
@@ -74,23 +80,11 @@ void grpc_chttp2_prepare_to_read(
transport_parsing->next_stream_id = transport_global->next_stream_id;
/* update the parsing view of incoming window */
- if (transport_parsing->incoming_window != transport_global->incoming_window) {
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
- "parse", transport_parsing, incoming_window,
- (gpr_int64)transport_global->incoming_window -
- (gpr_int64)transport_parsing->incoming_window);
- transport_parsing->incoming_window = transport_global->incoming_window;
- }
- while (grpc_chttp2_list_pop_incoming_window_updated(
+ while (grpc_chttp2_list_pop_unannounced_incoming_window_available(
transport_global, transport_parsing, &stream_global, &stream_parsing)) {
- stream_parsing->id = stream_global->id;
- if (stream_parsing->incoming_window != stream_global->incoming_window) {
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "parse", transport_parsing, stream_parsing, incoming_window,
- (gpr_int64)stream_global->incoming_window -
- (gpr_int64)stream_parsing->incoming_window);
- stream_parsing->incoming_window = stream_global->incoming_window;
- }
+ GRPC_CHTTP2_FLOW_MOVE_STREAM("parse", transport_parsing, stream_parsing,
+ incoming_window, stream_global,
+ unannounced_incoming_window_for_parse);
}
GPR_TIMER_END("grpc_chttp2_prepare_to_read", 0);
@@ -101,6 +95,8 @@ void grpc_chttp2_publish_reads(
grpc_chttp2_transport_parsing *transport_parsing) {
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_stream_parsing *stream_parsing;
+ int was_zero;
+ int is_zero;
/* transport_parsing->last_incoming_stream_id is used as
last-grpc_chttp2_stream-id when
@@ -144,98 +140,102 @@ void grpc_chttp2_publish_reads(
}
/* propagate flow control tokens to global state */
- if (transport_parsing->outgoing_window_update) {
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
- "parsed", transport_global, outgoing_window,
- transport_parsing->outgoing_window_update);
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
- "parsed", transport_parsing, outgoing_window_update,
- -(gpr_int64)transport_parsing->outgoing_window_update);
- transport_global->outgoing_window +=
- transport_parsing->outgoing_window_update;
- transport_parsing->outgoing_window_update = 0;
- }
-
- if (transport_parsing->incoming_window_delta) {
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
- "parsed", transport_global, incoming_window,
- -(gpr_int64)transport_parsing->incoming_window_delta);
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
- "parsed", transport_parsing, incoming_window_delta,
- -(gpr_int64)transport_parsing->incoming_window_delta);
- transport_global->incoming_window -=
- transport_parsing->incoming_window_delta;
- transport_parsing->incoming_window_delta = 0;
+ was_zero = transport_global->outgoing_window <= 0;
+ GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("parsed", transport_global, outgoing_window,
+ transport_parsing, outgoing_window);
+ is_zero = transport_global->outgoing_window <= 0;
+ if (was_zero && !is_zero) {
+ while (grpc_chttp2_list_pop_stalled_by_transport(transport_global,
+ &stream_global)) {
+ grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ }
+ }
+
+ if (transport_parsing->incoming_window <
+ transport_global->connection_window_target * 3 / 4) {
+ gpr_int64 announce_bytes = transport_global->connection_window_target -
+ transport_parsing->incoming_window;
+ GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", transport_global,
+ announce_incoming_window, announce_bytes);
+ GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", transport_parsing,
+ incoming_window, announce_bytes);
}
/* for each stream that saw an update, fixup global state */
while (grpc_chttp2_list_pop_parsing_seen_stream(
transport_global, transport_parsing, &stream_global, &stream_parsing)) {
- /* update incoming flow control window */
- if (stream_parsing->incoming_window_delta) {
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "parsed", transport_parsing, stream_global, incoming_window,
- -(gpr_int64)stream_parsing->incoming_window_delta);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "parsed", transport_parsing, stream_parsing, incoming_window_delta,
- -(gpr_int64)stream_parsing->incoming_window_delta);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "parsed", transport_parsing, stream_global, max_recv_bytes,
- -(gpr_int64)stream_parsing->incoming_window_delta);
- stream_global->incoming_window -= stream_parsing->incoming_window_delta;
- GPR_ASSERT(stream_global->max_recv_bytes >=
- stream_parsing->incoming_window_delta);
- stream_global->max_recv_bytes -= stream_parsing->incoming_window_delta;
- stream_parsing->incoming_window_delta = 0;
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ if (stream_parsing->seen_error) {
+ stream_global->seen_error = 1;
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
/* update outgoing flow control window */
- if (stream_parsing->outgoing_window_update) {
- int was_zero = stream_global->outgoing_window <= 0;
- int is_zero;
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("parsed", transport_parsing,
- stream_global, outgoing_window,
- stream_parsing->outgoing_window_update);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "parsed", transport_parsing, stream_parsing, outgoing_window_update,
- -(gpr_int64)stream_parsing->outgoing_window_update);
- GPR_ASSERT(stream_parsing->outgoing_window_update <= GPR_UINT32_MAX);
- stream_global->outgoing_window +=
- (gpr_uint32)stream_parsing->outgoing_window_update;
- stream_parsing->outgoing_window_update = 0;
- is_zero = stream_global->outgoing_window <= 0;
- if (was_zero && !is_zero) {
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
- }
+ was_zero = stream_global->outgoing_window <= 0;
+ GRPC_CHTTP2_FLOW_MOVE_STREAM("parsed", transport_global, stream_global,
+ outgoing_window, stream_parsing,
+ outgoing_window);
+ is_zero = stream_global->outgoing_window <= 0;
+ if (was_zero && !is_zero) {
+ grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
- /* updating closed status */
- if (stream_parsing->received_close) {
- stream_global->read_closed = 1;
- grpc_chttp2_list_add_read_write_state_changed(transport_global,
- stream_global);
+ stream_global->max_recv_bytes -= (gpr_uint32)GPR_MIN(
+ stream_global->max_recv_bytes, stream_parsing->received_bytes);
+ stream_parsing->received_bytes = 0;
+
+ /* publish incoming stream ops */
+ if (stream_global->incoming_frames.tail != NULL) {
+ stream_global->incoming_frames.tail->is_tail = 0;
+ }
+ if (stream_parsing->data_parser.incoming_frames.head != NULL) {
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ }
+ grpc_chttp2_incoming_frame_queue_merge(
+ &stream_global->incoming_frames,
+ &stream_parsing->data_parser.incoming_frames);
+ if (stream_global->incoming_frames.tail != NULL) {
+ stream_global->incoming_frames.tail->is_tail = 1;
}
+
+ if (!stream_global->published_initial_metadata &&
+ stream_parsing->got_metadata_on_parse[0]) {
+ stream_parsing->got_metadata_on_parse[0] = 0;
+ stream_global->published_initial_metadata = 1;
+ GPR_SWAP(grpc_chttp2_incoming_metadata_buffer,
+ stream_parsing->metadata_buffer[0],
+ stream_global->received_initial_metadata);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ }
+ if (!stream_global->published_trailing_metadata &&
+ stream_parsing->got_metadata_on_parse[1]) {
+ stream_parsing->got_metadata_on_parse[1] = 0;
+ stream_global->published_trailing_metadata = 1;
+ GPR_SWAP(grpc_chttp2_incoming_metadata_buffer,
+ stream_parsing->metadata_buffer[1],
+ stream_global->received_trailing_metadata);
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ }
+
if (stream_parsing->saw_rst_stream) {
- stream_global->cancelled = 1;
- stream_global->cancelled_status = grpc_chttp2_http2_error_to_grpc_status(
- (grpc_chttp2_error_code)stream_parsing->rst_stream_reason);
- if (stream_parsing->rst_stream_reason == GRPC_CHTTP2_NO_ERROR) {
- stream_global->published_cancelled = 1;
+ if (stream_parsing->rst_stream_reason != GRPC_CHTTP2_NO_ERROR) {
+ grpc_status_code status_code = grpc_chttp2_http2_error_to_grpc_status(
+ (grpc_chttp2_error_code)stream_parsing->rst_stream_reason);
+ char *status_details;
+ gpr_slice slice_details;
+ gpr_asprintf(&status_details, "Received RST_STREAM err=%d",
+ stream_parsing->rst_stream_reason);
+ slice_details = gpr_slice_from_copied_string(status_details);
+ gpr_free(status_details);
+ grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global,
+ status_code, &slice_details);
}
- grpc_chttp2_list_add_read_write_state_changed(transport_global,
- stream_global);
+ grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
+ 1, 1);
}
- /* publish incoming stream ops */
- if (stream_parsing->data_parser.incoming_sopb.nops > 0) {
- grpc_incoming_metadata_buffer_move_to_referencing_sopb(
- &stream_parsing->incoming_metadata, &stream_global->incoming_metadata,
- &stream_parsing->data_parser.incoming_sopb);
- grpc_sopb_move_to(&stream_parsing->data_parser.incoming_sopb,
- &stream_global->incoming_sopb);
- grpc_chttp2_list_add_read_write_state_changed(transport_global,
- stream_global);
+ if (stream_parsing->received_close) {
+ grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
+ 1, 0);
}
}
}
@@ -363,7 +363,7 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(cur < end);
transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur);
transport_parsing->deframe_state = GRPC_DTS_FRAME;
- if (!init_frame_parser(transport_parsing)) {
+ if (!init_frame_parser(exec_ctx, transport_parsing)) {
return 0;
}
if (transport_parsing->incoming_stream_id) {
@@ -428,7 +428,8 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
GPR_UNREACHABLE_CODE(return 0);
}
-static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) {
+static int init_frame_parser(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_parsing *transport_parsing) {
if (transport_parsing->expect_continuation_stream_id != 0) {
if (transport_parsing->incoming_frame_type !=
GRPC_CHTTP2_FRAME_CONTINUATION) {
@@ -445,30 +446,30 @@ static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) {
transport_parsing->incoming_stream_id);
return 0;
}
- return init_header_frame_parser(transport_parsing, 1);
+ return init_header_frame_parser(exec_ctx, transport_parsing, 1);
}
switch (transport_parsing->incoming_frame_type) {
case GRPC_CHTTP2_FRAME_DATA:
- return init_data_frame_parser(transport_parsing);
+ return init_data_frame_parser(exec_ctx, transport_parsing);
case GRPC_CHTTP2_FRAME_HEADER:
- return init_header_frame_parser(transport_parsing, 0);
+ return init_header_frame_parser(exec_ctx, transport_parsing, 0);
case GRPC_CHTTP2_FRAME_CONTINUATION:
gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame");
return 0;
case GRPC_CHTTP2_FRAME_RST_STREAM:
- return init_rst_stream_parser(transport_parsing);
+ return init_rst_stream_parser(exec_ctx, transport_parsing);
case GRPC_CHTTP2_FRAME_SETTINGS:
- return init_settings_frame_parser(transport_parsing);
+ return init_settings_frame_parser(exec_ctx, transport_parsing);
case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
- return init_window_update_frame_parser(transport_parsing);
+ return init_window_update_frame_parser(exec_ctx, transport_parsing);
case GRPC_CHTTP2_FRAME_PING:
- return init_ping_parser(transport_parsing);
+ return init_ping_parser(exec_ctx, transport_parsing);
case GRPC_CHTTP2_FRAME_GOAWAY:
- return init_goaway_parser(transport_parsing);
+ return init_goaway_parser(exec_ctx, transport_parsing);
default:
gpr_log(GPR_ERROR, "Unknown frame type %02x",
transport_parsing->incoming_frame_type);
- return init_skip_frame_parser(transport_parsing, 0);
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 0);
}
}
@@ -482,7 +483,8 @@ static grpc_chttp2_parse_error skip_parser(
static void skip_header(void *tp, grpc_mdelem *md) { GRPC_MDELEM_UNREF(md); }
static int init_skip_frame_parser(
- grpc_chttp2_transport_parsing *transport_parsing, int is_header) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
+ int is_header) {
if (is_header) {
gpr_uint8 is_eoh = transport_parsing->expect_continuation_stream_id != 0;
transport_parsing->parser = grpc_chttp2_header_parser_parse;
@@ -499,65 +501,51 @@ static int init_skip_frame_parser(
}
void grpc_chttp2_parsing_become_skip_parser(
- grpc_chttp2_transport_parsing *transport_parsing) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) {
init_skip_frame_parser(
- transport_parsing,
+ exec_ctx, transport_parsing,
transport_parsing->parser == grpc_chttp2_header_parser_parse);
}
static grpc_chttp2_parse_error update_incoming_window(
- grpc_chttp2_transport_parsing *transport_parsing,
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing) {
- if (transport_parsing->incoming_frame_size >
- transport_parsing->incoming_window) {
+ gpr_uint32 incoming_frame_size = transport_parsing->incoming_frame_size;
+ if (incoming_frame_size > transport_parsing->incoming_window) {
gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
transport_parsing->incoming_frame_size,
transport_parsing->incoming_window);
return GRPC_CHTTP2_CONNECTION_ERROR;
}
- if (transport_parsing->incoming_frame_size >
- stream_parsing->incoming_window) {
+ if (incoming_frame_size > stream_parsing->incoming_window) {
gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
transport_parsing->incoming_frame_size,
stream_parsing->incoming_window);
return GRPC_CHTTP2_CONNECTION_ERROR;
}
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
- "data", transport_parsing, incoming_window,
- -(gpr_int64)transport_parsing->incoming_frame_size);
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT("data", transport_parsing,
- incoming_window_delta,
- transport_parsing->incoming_frame_size);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "data", transport_parsing, stream_parsing, incoming_window,
- -(gpr_int64)transport_parsing->incoming_frame_size);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("data", transport_parsing, stream_parsing,
- incoming_window_delta,
- transport_parsing->incoming_frame_size);
-
- transport_parsing->incoming_window -= transport_parsing->incoming_frame_size;
- transport_parsing->incoming_window_delta +=
- transport_parsing->incoming_frame_size;
- stream_parsing->incoming_window -= transport_parsing->incoming_frame_size;
- stream_parsing->incoming_window_delta +=
- transport_parsing->incoming_frame_size;
+ GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", transport_parsing, incoming_window,
+ incoming_frame_size);
+ GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", transport_parsing, stream_parsing,
+ incoming_window, incoming_frame_size);
+ stream_parsing->received_bytes += incoming_frame_size;
+
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
return GRPC_CHTTP2_PARSE_OK;
}
static int init_data_frame_parser(
- grpc_chttp2_transport_parsing *transport_parsing) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) {
grpc_chttp2_stream_parsing *stream_parsing =
grpc_chttp2_parsing_lookup_stream(transport_parsing,
transport_parsing->incoming_stream_id);
grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK;
if (!stream_parsing || stream_parsing->received_close)
- return init_skip_frame_parser(transport_parsing, 0);
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 0);
if (err == GRPC_CHTTP2_PARSE_OK) {
- err = update_incoming_window(transport_parsing, stream_parsing);
+ err = update_incoming_window(exec_ctx, transport_parsing, stream_parsing);
}
if (err == GRPC_CHTTP2_PARSE_OK) {
err = grpc_chttp2_data_parser_begin_frame(
@@ -577,7 +565,7 @@ static int init_data_frame_parser(
&transport_parsing->qbuf,
grpc_chttp2_rst_stream_create(transport_parsing->incoming_stream_id,
GRPC_CHTTP2_PROTOCOL_ERROR));
- return init_skip_frame_parser(transport_parsing, 0);
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 0);
case GRPC_CHTTP2_CONNECTION_ERROR:
return 0;
}
@@ -586,11 +574,13 @@ static int init_data_frame_parser(
static void free_timeout(void *p) { gpr_free(p); }
-static void on_header(void *tp, grpc_mdelem *md) {
+static void on_initial_header(void *tp, grpc_mdelem *md) {
grpc_chttp2_transport_parsing *transport_parsing = tp;
grpc_chttp2_stream_parsing *stream_parsing =
transport_parsing->incoming_stream;
+ GPR_TIMER_BEGIN("on_initial_header", 0);
+
GPR_ASSERT(stream_parsing);
GRPC_CHTTP2_IF_TRACING(gpr_log(
@@ -598,6 +588,12 @@ static void on_header(void *tp, grpc_mdelem *md) {
transport_parsing->is_client ? "CLI" : "SVR",
grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
+ if (md->key == transport_parsing->elem_grpc_status_ok->key &&
+ md != transport_parsing->elem_grpc_status_ok) {
+ /* TODO(ctiller): check for a status like " 0" */
+ stream_parsing->seen_error = 1;
+ }
+
if (md->key == transport_parsing->str_grpc_timeout) {
gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
if (!cached_timeout) {
@@ -612,24 +608,57 @@ static void on_header(void *tp, grpc_mdelem *md) {
grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
}
grpc_chttp2_incoming_metadata_buffer_set_deadline(
- &stream_parsing->incoming_metadata,
+ &stream_parsing->metadata_buffer[0],
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), *cached_timeout));
GRPC_MDELEM_UNREF(md);
} else {
- grpc_chttp2_incoming_metadata_buffer_add(&stream_parsing->incoming_metadata,
- md);
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &stream_parsing->metadata_buffer[0], md);
}
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
+
+ GPR_TIMER_END("on_initial_header", 0);
+}
+
+static void on_trailing_header(void *tp, grpc_mdelem *md) {
+ grpc_chttp2_transport_parsing *transport_parsing = tp;
+ grpc_chttp2_stream_parsing *stream_parsing =
+ transport_parsing->incoming_stream;
+
+ GPR_TIMER_BEGIN("on_trailing_header", 0);
+
+ GPR_ASSERT(stream_parsing);
+
+ GRPC_CHTTP2_IF_TRACING(gpr_log(
+ GPR_INFO, "HTTP:%d:TRL:%s: %s: %s", stream_parsing->id,
+ transport_parsing->is_client ? "CLI" : "SVR",
+ grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
+
+ if (md->key == transport_parsing->elem_grpc_status_ok->key &&
+ md != transport_parsing->elem_grpc_status_ok) {
+ /* TODO(ctiller): check for a status like " 0" */
+ stream_parsing->seen_error = 1;
+ }
+
+ grpc_chttp2_incoming_metadata_buffer_add(&stream_parsing->metadata_buffer[1],
+ md);
+
+ grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
+
+ GPR_TIMER_END("on_trailing_header", 0);
}
static int init_header_frame_parser(
- grpc_chttp2_transport_parsing *transport_parsing, int is_continuation) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
+ int is_continuation) {
gpr_uint8 is_eoh = (transport_parsing->incoming_frame_flags &
GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0;
int via_accept = 0;
grpc_chttp2_stream_parsing *stream_parsing;
+ /* TODO(ctiller): when to increment header_frames_received? */
+
if (is_eoh) {
transport_parsing->expect_continuation_stream_id = 0;
} else {
@@ -649,7 +678,7 @@ static int init_header_frame_parser(
if (is_continuation) {
gpr_log(GPR_ERROR,
"grpc_chttp2_stream disbanded before CONTINUATION received");
- return init_skip_frame_parser(transport_parsing, 1);
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 1);
}
if (transport_parsing->is_client) {
if ((transport_parsing->incoming_stream_id & 1) &&
@@ -660,7 +689,7 @@ static int init_header_frame_parser(
gpr_log(GPR_ERROR,
"ignoring new grpc_chttp2_stream creation on client");
}
- return init_skip_frame_parser(transport_parsing, 1);
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 1);
} else if (transport_parsing->last_incoming_stream_id >
transport_parsing->incoming_stream_id) {
gpr_log(GPR_ERROR,
@@ -669,19 +698,19 @@ static int init_header_frame_parser(
"id=%d, new grpc_chttp2_stream id=%d",
transport_parsing->last_incoming_stream_id,
transport_parsing->incoming_stream_id);
- return init_skip_frame_parser(transport_parsing, 1);
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 1);
} else if ((transport_parsing->incoming_stream_id & 1) == 0) {
gpr_log(GPR_ERROR,
"ignoring grpc_chttp2_stream with non-client generated index %d",
transport_parsing->incoming_stream_id);
- return init_skip_frame_parser(transport_parsing, 1);
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 1);
}
stream_parsing = transport_parsing->incoming_stream =
grpc_chttp2_parsing_accept_stream(
- transport_parsing, transport_parsing->incoming_stream_id);
+ exec_ctx, transport_parsing, transport_parsing->incoming_stream_id);
if (stream_parsing == NULL) {
gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted");
- return init_skip_frame_parser(transport_parsing, 1);
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 1);
}
via_accept = 1;
} else {
@@ -691,11 +720,21 @@ static int init_header_frame_parser(
if (stream_parsing->received_close) {
gpr_log(GPR_ERROR, "skipping already closed grpc_chttp2_stream header");
transport_parsing->incoming_stream = NULL;
- return init_skip_frame_parser(transport_parsing, 1);
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 1);
}
transport_parsing->parser = grpc_chttp2_header_parser_parse;
transport_parsing->parser_data = &transport_parsing->hpack_parser;
- transport_parsing->hpack_parser.on_header = on_header;
+ switch (stream_parsing->header_frames_received) {
+ case 0:
+ transport_parsing->hpack_parser.on_header = on_initial_header;
+ break;
+ case 1:
+ transport_parsing->hpack_parser.on_header = on_trailing_header;
+ break;
+ case 2:
+ gpr_log(GPR_ERROR, "too many header frames received");
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 1);
+ }
transport_parsing->hpack_parser.on_header_user_data = transport_parsing;
transport_parsing->hpack_parser.is_boundary = is_eoh;
transport_parsing->hpack_parser.is_eof =
@@ -708,7 +747,7 @@ static int init_header_frame_parser(
}
static int init_window_update_frame_parser(
- grpc_chttp2_transport_parsing *transport_parsing) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) {
int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame(
&transport_parsing->simple.window_update,
transport_parsing->incoming_frame_size,
@@ -722,7 +761,8 @@ static int init_window_update_frame_parser(
return ok;
}
-static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing) {
+static int init_ping_parser(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_parsing *transport_parsing) {
int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_ping_parser_begin_frame(
&transport_parsing->simple.ping,
transport_parsing->incoming_frame_size,
@@ -733,7 +773,7 @@ static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing) {
}
static int init_rst_stream_parser(
- grpc_chttp2_transport_parsing *transport_parsing) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) {
int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_rst_stream_parser_begin_frame(
&transport_parsing->simple.rst_stream,
transport_parsing->incoming_frame_size,
@@ -741,7 +781,7 @@ static int init_rst_stream_parser(
transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream(
transport_parsing, transport_parsing->incoming_stream_id);
if (!transport_parsing->incoming_stream) {
- return init_skip_frame_parser(transport_parsing, 0);
+ return init_skip_frame_parser(exec_ctx, transport_parsing, 0);
}
transport_parsing->parser = grpc_chttp2_rst_stream_parser_parse;
transport_parsing->parser_data = &transport_parsing->simple.rst_stream;
@@ -749,7 +789,7 @@ static int init_rst_stream_parser(
}
static int init_goaway_parser(
- grpc_chttp2_transport_parsing *transport_parsing) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) {
int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_goaway_parser_begin_frame(
&transport_parsing->goaway_parser,
transport_parsing->incoming_frame_size,
@@ -760,7 +800,7 @@ static int init_goaway_parser(
}
static int init_settings_frame_parser(
- grpc_chttp2_transport_parsing *transport_parsing) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) {
int ok;
if (transport_parsing->incoming_stream_id != 0) {
@@ -806,7 +846,7 @@ static int parse_frame_slice(grpc_exec_ctx *exec_ctx,
}
return 1;
case GRPC_CHTTP2_STREAM_ERROR:
- grpc_chttp2_parsing_become_skip_parser(transport_parsing);
+ grpc_chttp2_parsing_become_skip_parser(exec_ctx, transport_parsing);
if (stream_parsing) {
stream_parsing->saw_rst_stream = 1;
stream_parsing->rst_stream_reason = GRPC_CHTTP2_PROTOCOL_ERROR;
diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c
index 781db7b0d6..a81ffcce79 100644
--- a/src/core/transport/chttp2/stream_lists.c
+++ b/src/core/transport/chttp2/stream_lists.c
@@ -142,12 +142,13 @@ static void stream_list_add_tail(grpc_chttp2_transport *t,
s->included[id] = 1;
}
-static void stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
- grpc_chttp2_stream_list_id id) {
+static int stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
+ grpc_chttp2_stream_list_id id) {
if (s->included[id]) {
- return;
+ return 0;
}
stream_list_add_tail(t, s, id);
+ return 1;
}
/* wrappers for specializations */
@@ -192,12 +193,12 @@ void grpc_chttp2_list_remove_writable_stream(
GRPC_CHTTP2_LIST_WRITABLE);
}
-void grpc_chttp2_list_add_writing_stream(
+int grpc_chttp2_list_add_writing_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
- stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
- STREAM_FROM_WRITING(stream_writing),
- GRPC_CHTTP2_LIST_WRITING);
+ return stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
+ STREAM_FROM_WRITING(stream_writing),
+ GRPC_CHTTP2_LIST_WRITING);
}
int grpc_chttp2_list_have_writing_streams(
@@ -241,6 +242,40 @@ int grpc_chttp2_list_pop_written_stream(
return r;
}
+void grpc_chttp2_list_add_unannounced_incoming_window_available(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global) {
+ GPR_ASSERT(stream_global->id != 0);
+ stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
+ STREAM_FROM_GLOBAL(stream_global),
+ GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE);
+}
+
+void grpc_chttp2_list_remove_unannounced_incoming_window_available(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global) {
+ stream_list_maybe_remove(
+ TRANSPORT_FROM_GLOBAL(transport_global),
+ STREAM_FROM_GLOBAL(stream_global),
+ GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE);
+}
+
+int grpc_chttp2_list_pop_unannounced_incoming_window_available(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_transport_parsing *transport_parsing,
+ grpc_chttp2_stream_global **stream_global,
+ grpc_chttp2_stream_parsing **stream_parsing) {
+ grpc_chttp2_stream *stream;
+ int r =
+ stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
+ GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE);
+ if (r != 0) {
+ *stream_global = &stream->global;
+ *stream_parsing = &stream->parsing;
+ }
+ return r;
+}
+
void grpc_chttp2_list_add_parsing_seen_stream(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing) {
@@ -284,91 +319,60 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(
return r;
}
-void grpc_chttp2_list_add_closed_waiting_for_parsing(
+void grpc_chttp2_list_add_check_read_ops(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
- GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING);
+ GRPC_CHTTP2_LIST_CHECK_READ_OPS);
}
-int grpc_chttp2_list_pop_closed_waiting_for_parsing(
+int grpc_chttp2_list_pop_check_read_ops(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
- GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING);
+ GRPC_CHTTP2_LIST_CHECK_READ_OPS);
if (r != 0) {
*stream_global = &stream->global;
}
return r;
}
-void grpc_chttp2_list_add_cancelled_waiting_for_writing(
- grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global) {
- stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
- STREAM_FROM_GLOBAL(stream_global),
- GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING);
+void grpc_chttp2_list_add_stalled_by_transport(
+ grpc_chttp2_transport_writing *transport_writing,
+ grpc_chttp2_stream_writing *stream_writing) {
+ stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
+ STREAM_FROM_WRITING(stream_writing),
+ GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
-int grpc_chttp2_list_pop_cancelled_waiting_for_writing(
+int grpc_chttp2_list_pop_stalled_by_transport(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
- GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING);
+ GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
if (r != 0) {
*stream_global = &stream->global;
}
return r;
}
-void grpc_chttp2_list_add_incoming_window_updated(
- grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global) {
- stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
- STREAM_FROM_GLOBAL(stream_global),
- GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED);
-}
-
-int grpc_chttp2_list_pop_incoming_window_updated(
- grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_transport_parsing *transport_parsing,
- grpc_chttp2_stream_global **stream_global,
- grpc_chttp2_stream_parsing **stream_parsing) {
- grpc_chttp2_stream *stream;
- int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
- GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED);
- if (r != 0) {
- *stream_global = &stream->global;
- *stream_parsing = &stream->parsing;
- }
- return r;
-}
-
-void grpc_chttp2_list_remove_incoming_window_updated(
- grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global) {
- stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global),
- STREAM_FROM_GLOBAL(stream_global),
- GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED);
-}
-
-void grpc_chttp2_list_add_read_write_state_changed(
+void grpc_chttp2_list_add_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
- GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED);
+ GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING);
}
-int grpc_chttp2_list_pop_read_write_state_changed(
+int grpc_chttp2_list_pop_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
- GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED);
+ GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING);
if (r != 0) {
*stream_global = &stream->global;
}
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index 69ad8854ba..353e476e40 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -40,15 +40,16 @@
#include "src/core/profiling/timers.h"
#include "src/core/transport/chttp2/http2_errors.h"
-static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing);
+static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_writing *transport_writing);
int grpc_chttp2_unlocking_check_writes(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_stream_writing *stream_writing;
- grpc_chttp2_stream_global *first_reinserted_stream = NULL;
- gpr_uint32 window_delta;
+
+ GPR_TIMER_BEGIN("grpc_chttp2_unlocking_check_writes", 0);
/* simple writes are queued to qbuf, and flushed here */
gpr_slice_buffer_swap(&transport_global->qbuf, &transport_writing->outbuf);
@@ -67,98 +68,103 @@ int grpc_chttp2_unlocking_check_writes(
transport_global->sent_local_settings = 1;
}
+ GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window,
+ transport_global, outgoing_window);
+
/* for each grpc_chttp2_stream that's become writable, frame it's data
(according to available window sizes) and add to the output buffer */
while (grpc_chttp2_list_pop_writable_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
- if (stream_global == first_reinserted_stream) {
- /* prevent infinite loop */
- grpc_chttp2_list_add_first_writable_stream(transport_global,
- stream_global);
- break;
- }
+ gpr_uint8 sent_initial_metadata;
stream_writing->id = stream_global->id;
- stream_writing->send_closed = GRPC_DONT_SEND_CLOSED;
-
- if (stream_global->outgoing_sopb) {
- window_delta = grpc_chttp2_preencode(
- stream_global->outgoing_sopb->ops,
- &stream_global->outgoing_sopb->nops,
- (gpr_uint32)GPR_MIN(GPR_MIN(transport_global->outgoing_window,
- stream_global->outgoing_window),
- GPR_UINT32_MAX),
- &stream_writing->sopb);
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
- "write", transport_global, outgoing_window, -(gpr_int64)window_delta);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global,
- outgoing_window,
- -(gpr_int64)window_delta);
- transport_global->outgoing_window -= window_delta;
- stream_global->outgoing_window -= window_delta;
-
- if (stream_global->write_state == GRPC_WRITE_STATE_QUEUED_CLOSE &&
- stream_global->outgoing_sopb->nops == 0) {
- if (!transport_global->is_client && !stream_global->read_closed) {
- stream_writing->send_closed = GRPC_SEND_CLOSED_WITH_RST_STREAM;
+ stream_writing->read_closed = stream_global->read_closed;
+
+ GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_writing, stream_writing,
+ outgoing_window, stream_global,
+ outgoing_window);
+
+ sent_initial_metadata = stream_writing->sent_initial_metadata;
+ if (!sent_initial_metadata && stream_global->send_initial_metadata) {
+ stream_writing->send_initial_metadata =
+ stream_global->send_initial_metadata;
+ stream_global->send_initial_metadata = NULL;
+ if (grpc_chttp2_list_add_writing_stream(transport_writing,
+ stream_writing)) {
+ GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
+ }
+ sent_initial_metadata = 1;
+ }
+ if (sent_initial_metadata) {
+ if (stream_global->send_message != NULL) {
+ gpr_slice hdr = gpr_slice_malloc(5);
+ gpr_uint8 *p = GPR_SLICE_START_PTR(hdr);
+ gpr_uint32 len = stream_global->send_message->length;
+ GPR_ASSERT(stream_writing->send_message == NULL);
+ p[0] = (stream_global->send_message->flags &
+ GRPC_WRITE_INTERNAL_COMPRESS) != 0;
+ p[1] = (gpr_uint8)(len >> 24);
+ p[2] = (gpr_uint8)(len >> 16);
+ p[3] = (gpr_uint8)(len >> 8);
+ p[4] = (gpr_uint8)(len);
+ gpr_slice_buffer_add(&stream_writing->flow_controlled_buffer, hdr);
+ if (stream_global->send_message->length > 0) {
+ stream_writing->send_message = stream_global->send_message;
} else {
- stream_writing->send_closed = GRPC_SEND_CLOSED;
+ stream_writing->send_message = NULL;
}
+ stream_writing->stream_fetched = 0;
+ stream_global->send_message = NULL;
}
-
- if (stream_global->outgoing_window > 0 &&
- stream_global->outgoing_sopb->nops != 0) {
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
- if (first_reinserted_stream == NULL &&
- transport_global->outgoing_window == 0) {
- first_reinserted_stream = stream_global;
+ if ((stream_writing->send_message != NULL ||
+ stream_writing->flow_controlled_buffer.length > 0) &&
+ stream_writing->outgoing_window > 0) {
+ if (transport_writing->outgoing_window > 0) {
+ if (grpc_chttp2_list_add_writing_stream(transport_writing,
+ stream_writing)) {
+ GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
+ }
+ } else {
+ grpc_chttp2_list_add_stalled_by_transport(transport_writing,
+ stream_writing);
+ }
+ }
+ if (stream_global->send_trailing_metadata) {
+ stream_writing->send_trailing_metadata =
+ stream_global->send_trailing_metadata;
+ stream_global->send_trailing_metadata = NULL;
+ if (grpc_chttp2_list_add_writing_stream(transport_writing,
+ stream_writing)) {
+ GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
}
}
}
if (!stream_global->read_closed &&
- stream_global->unannounced_incoming_window > 0) {
- GPR_ASSERT(stream_writing->announce_window == 0);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "write", transport_writing, stream_writing, announce_window,
- stream_global->unannounced_incoming_window);
- stream_writing->announce_window =
- stream_global->unannounced_incoming_window;
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "write", transport_global, stream_global, incoming_window,
- stream_global->unannounced_incoming_window);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "write", transport_global, stream_global, unannounced_incoming_window,
- -(gpr_int64)stream_global->unannounced_incoming_window);
- stream_global->incoming_window +=
- stream_global->unannounced_incoming_window;
- stream_global->unannounced_incoming_window = 0;
- grpc_chttp2_list_add_incoming_window_updated(transport_global,
- stream_global);
- stream_global->writing_now |= GRPC_CHTTP2_WRITING_WINDOW;
- }
- if (stream_writing->sopb.nops > 0 ||
- stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
- stream_global->writing_now |= GRPC_CHTTP2_WRITING_DATA;
- }
- if (stream_global->writing_now != 0) {
- grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
+ stream_global->unannounced_incoming_window_for_writing > 1024) {
+ GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing,
+ announce_window, stream_global,
+ unannounced_incoming_window_for_writing);
+ if (grpc_chttp2_list_add_writing_stream(transport_writing,
+ stream_writing)) {
+ GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
+ }
}
}
/* if the grpc_chttp2_transport is ready to send a window update, do so here
also; 3/4 is a magic number that will likely get tuned soon */
- if (transport_global->incoming_window <
- transport_global->connection_window_target * 3 / 4) {
- window_delta = transport_global->connection_window_target -
- transport_global->incoming_window;
+ if (transport_global->announce_incoming_window > 0) {
+ gpr_uint32 announced = (gpr_uint32)GPR_MIN(
+ transport_global->announce_incoming_window, GPR_UINT32_MAX);
+ GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_global,
+ announce_incoming_window, announced);
gpr_slice_buffer_add(&transport_writing->outbuf,
- grpc_chttp2_window_update_create(0, window_delta));
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT("write", transport_global,
- incoming_window, window_delta);
- transport_global->incoming_window += window_delta;
+ grpc_chttp2_window_update_create(0, announced));
}
+ GPR_TIMER_END("grpc_chttp2_unlocking_check_writes", 0);
+
return transport_writing->outbuf.count > 0 ||
grpc_chttp2_list_have_writing_streams(transport_writing);
}
@@ -169,50 +175,146 @@ void grpc_chttp2_perform_writes(
GPR_ASSERT(transport_writing->outbuf.count > 0 ||
grpc_chttp2_list_have_writing_streams(transport_writing));
- finalize_outbuf(transport_writing);
+ finalize_outbuf(exec_ctx, transport_writing);
- GPR_ASSERT(transport_writing->outbuf.count > 0);
GPR_ASSERT(endpoint);
- grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf,
- &transport_writing->done_cb);
+ if (transport_writing->outbuf.count > 0) {
+ grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf,
+ &transport_writing->done_cb);
+ } else {
+ grpc_exec_ctx_enqueue(exec_ctx, &transport_writing->done_cb, 1);
+ }
}
-static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
+static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream_writing *stream_writing;
GPR_TIMER_BEGIN("finalize_outbuf", 0);
while (
grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) {
- if (stream_writing->sopb.nops > 0 ||
- stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
- grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops,
- stream_writing->send_closed != GRPC_DONT_SEND_CLOSED,
- stream_writing->id,
- &transport_writing->hpack_compressor,
- &transport_writing->outbuf);
- stream_writing->sopb.nops = 0;
+ gpr_uint32 max_outgoing =
+ (gpr_uint32)GPR_MIN(GRPC_CHTTP2_MAX_PAYLOAD_LENGTH,
+ GPR_MIN(stream_writing->outgoing_window,
+ transport_writing->outgoing_window));
+ /* send initial metadata if it's available */
+ if (stream_writing->send_initial_metadata != NULL) {
+ grpc_chttp2_encode_header(
+ &transport_writing->hpack_compressor, stream_writing->id,
+ stream_writing->send_initial_metadata, 0, &transport_writing->outbuf);
+ stream_writing->send_initial_metadata = NULL;
+ stream_writing->sent_initial_metadata = 1;
}
- if (stream_writing->announce_window > 0) {
+ /* send any window updates */
+ if (stream_writing->announce_window > 0 &&
+ stream_writing->send_initial_metadata == NULL) {
+ gpr_uint32 announce = stream_writing->announce_window;
gpr_slice_buffer_add(
&transport_writing->outbuf,
grpc_chttp2_window_update_create(stream_writing->id,
stream_writing->announce_window));
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "write", transport_writing, stream_writing, announce_window,
- -(gpr_int64)stream_writing->announce_window);
+ GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing, stream_writing,
+ announce_window, announce);
stream_writing->announce_window = 0;
}
- if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) {
- gpr_slice_buffer_add(&transport_writing->outbuf,
- grpc_chttp2_rst_stream_create(stream_writing->id,
- GRPC_CHTTP2_NO_ERROR));
+ /* fetch any body bytes */
+ while (!stream_writing->fetching && stream_writing->send_message &&
+ stream_writing->flow_controlled_buffer.length < max_outgoing &&
+ stream_writing->stream_fetched <
+ stream_writing->send_message->length) {
+ if (grpc_byte_stream_next(exec_ctx, stream_writing->send_message,
+ &stream_writing->fetching_slice, max_outgoing,
+ &stream_writing->finished_fetch)) {
+ stream_writing->stream_fetched +=
+ GPR_SLICE_LENGTH(stream_writing->fetching_slice);
+ if (stream_writing->stream_fetched ==
+ stream_writing->send_message->length) {
+ stream_writing->send_message = NULL;
+ }
+ gpr_slice_buffer_add(&stream_writing->flow_controlled_buffer,
+ stream_writing->fetching_slice);
+ } else {
+ stream_writing->fetching = 1;
+ }
+ }
+ /* send any body bytes */
+ if (stream_writing->flow_controlled_buffer.length > 0) {
+ if (max_outgoing > 0) {
+ gpr_uint32 send_bytes = (gpr_uint32)GPR_MIN(
+ max_outgoing, stream_writing->flow_controlled_buffer.length);
+ int is_last_data_frame =
+ stream_writing->send_message == NULL &&
+ send_bytes == stream_writing->flow_controlled_buffer.length;
+ int is_last_frame = is_last_data_frame &&
+ stream_writing->send_trailing_metadata != NULL &&
+ grpc_metadata_batch_is_empty(
+ stream_writing->send_trailing_metadata);
+ grpc_chttp2_encode_data(
+ stream_writing->id, &stream_writing->flow_controlled_buffer,
+ send_bytes, is_last_frame, &transport_writing->outbuf);
+ GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing,
+ stream_writing, outgoing_window,
+ send_bytes);
+ GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_writing,
+ outgoing_window, send_bytes);
+ if (is_last_frame) {
+ stream_writing->send_trailing_metadata = NULL;
+ stream_writing->sent_trailing_metadata = 1;
+ }
+ if (is_last_data_frame) {
+ GPR_ASSERT(stream_writing->send_message == NULL);
+ stream_writing->sent_message = 1;
+ }
+ } else if (transport_writing->outgoing_window == 0) {
+ grpc_chttp2_list_add_stalled_by_transport(transport_writing,
+ stream_writing);
+ grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
+ }
+ }
+ /* send trailing metadata if it's available and we're ready for it */
+ if (stream_writing->send_message == NULL &&
+ stream_writing->flow_controlled_buffer.length == 0 &&
+ stream_writing->send_trailing_metadata != NULL) {
+ if (grpc_metadata_batch_is_empty(
+ stream_writing->send_trailing_metadata)) {
+ grpc_chttp2_encode_data(stream_writing->id,
+ &stream_writing->flow_controlled_buffer, 0, 1,
+ &transport_writing->outbuf);
+ } else {
+ grpc_chttp2_encode_header(&transport_writing->hpack_compressor,
+ stream_writing->id,
+ stream_writing->send_trailing_metadata, 1,
+ &transport_writing->outbuf);
+ }
+ if (!transport_writing->is_client && !stream_writing->read_closed) {
+ gpr_slice_buffer_add(&transport_writing->outbuf,
+ grpc_chttp2_rst_stream_create(
+ stream_writing->id, GRPC_CHTTP2_NO_ERROR));
+ }
+ stream_writing->send_trailing_metadata = NULL;
+ stream_writing->sent_trailing_metadata = 1;
+ }
+ /* if there's more to write, then loop, otherwise prepare to finish the
+ * write */
+ if ((stream_writing->flow_controlled_buffer.length > 0 ||
+ (stream_writing->send_message && !stream_writing->fetching)) &&
+ stream_writing->outgoing_window > 0) {
+ if (transport_writing->outgoing_window > 0) {
+ if (grpc_chttp2_list_add_writing_stream(transport_writing,
+ stream_writing)) {
+ /* do nothing - already reffed */
+ }
+ } else {
+ grpc_chttp2_list_add_stalled_by_transport(transport_writing,
+ stream_writing);
+ grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
+ }
+ } else {
+ grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
- grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
-
- GPR_TIMER_END("finalize_outbuf", 0);
}
void grpc_chttp2_cleanup_writing(
@@ -223,24 +325,26 @@ void grpc_chttp2_cleanup_writing(
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
- GPR_ASSERT(stream_global->writing_now != 0);
- if (stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
- stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
- if (!transport_global->is_client) {
- stream_global->read_closed = 1;
- }
+ if (stream_writing->sent_trailing_metadata) {
+ grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
+ !transport_global->is_client, 1);
}
- if (stream_global->writing_now & GRPC_CHTTP2_WRITING_DATA) {
- if (stream_global->outgoing_sopb != NULL &&
- stream_global->outgoing_sopb->nops == 0) {
- GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE);
- stream_global->outgoing_sopb = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, stream_global->send_done_closure, 1);
- }
+ if (stream_writing->sent_initial_metadata) {
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, &stream_global->send_initial_metadata_finished, 1);
+ }
+ if (stream_writing->sent_message) {
+ GPR_ASSERT(stream_writing->send_message == NULL);
+ GPR_ASSERT(stream_global->send_message_finished);
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, &stream_global->send_message_finished, 1);
+ stream_writing->sent_message = 0;
+ }
+ if (stream_writing->sent_trailing_metadata) {
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, &stream_global->send_trailing_metadata_finished, 1);
}
- stream_global->writing_now = 0;
- grpc_chttp2_list_add_read_write_state_changed(transport_global,
- stream_global);
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
}
gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);
}