diff options
author | Craig Tiller <ctiller@google.com> | 2015-11-02 14:16:12 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-11-02 14:16:12 -0800 |
commit | 9d35a1f9ce98e446d9036d64227b3c9e9317177a (patch) | |
tree | 77f2d34117faffcf5649fbd6bd66a850eea4e4d3 /src/core | |
parent | e8b5f627ccae2db13e366059c3738e96ac9a5d29 (diff) |
stream_op cleanup: transport changes
Diffstat (limited to 'src/core')
22 files changed, 1841 insertions, 1421 deletions
diff --git a/src/core/transport/byte_stream.c b/src/core/transport/byte_stream.c new file mode 100644 index 0000000000..81e8e77ccb --- /dev/null +++ b/src/core/transport/byte_stream.c @@ -0,0 +1,76 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/transport/byte_stream.h" + +#include <stdlib.h> + +#include <grpc/support/log.h> + +int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, gpr_slice *slice, + size_t max_size_hint, grpc_closure *on_complete) { + return byte_stream->next(exec_ctx, byte_stream, slice, max_size_hint, + on_complete); +} + +void grpc_byte_stream_destroy(grpc_byte_stream *byte_stream) { + byte_stream->destroy(byte_stream); +} + +/* slice_buffer_stream */ + +static int slice_buffer_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + gpr_slice *slice, size_t max_size_hint, + grpc_closure *on_complete) { + grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; + GPR_ASSERT(stream->cursor < stream->backing_buffer->count); + *slice = gpr_slice_ref(stream->backing_buffer->slices[stream->cursor]); + stream->cursor++; + return 1; +} + +static void slice_buffer_stream_destroy(grpc_byte_stream *byte_stream) {} + +void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream, + gpr_slice_buffer *slice_buffer, + gpr_uint32 flags) { + GPR_ASSERT(slice_buffer->length <= GPR_UINT32_MAX); + stream->base.length = (gpr_uint32)slice_buffer->length; + stream->base.flags = flags; + stream->base.next = slice_buffer_stream_next; + stream->base.destroy = slice_buffer_stream_destroy; + stream->backing_buffer = slice_buffer; + stream->cursor = 0; +} diff --git a/src/core/transport/byte_stream.h b/src/core/transport/byte_stream.h new file mode 100644 index 0000000000..c94d8ff275 --- /dev/null +++ b/src/core/transport/byte_stream.h @@ -0,0 +1,88 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_TRANSPORT_BYTE_STREAM_H +#define GRPC_INTERNAL_CORE_TRANSPORT_BYTE_STREAM_H + +#include "src/core/iomgr/exec_ctx.h" +#include <grpc/support/slice_buffer.h> + +/** Internal bit flag for grpc_begin_message's \a flags signaling the use of + * compression for the message */ +#define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u) +/** Mask of all valid internal flags. */ +#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS) + +struct grpc_byte_stream; +typedef struct grpc_byte_stream grpc_byte_stream; + +struct grpc_byte_stream { + gpr_uint32 length; + gpr_uint32 flags; + int (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, + gpr_slice *slice, size_t max_size_hint, + grpc_closure *on_complete); + void (*destroy)(grpc_byte_stream *byte_stream); +}; + +/* returns 1 if the bytes are available immediately (in which case + * on_complete will not be called), 0 if the bytes will be available + * asynchronously. + * + * on entry, *remaining can be set as a hint as to the maximum number + * of bytes that would be acceptable to read. + * + * fills *buffer, *length, *remaining with the bytes, length of bytes + * and length of data remaining to be read before either returning 1 + * or calling on_complete. + * + * once a slice is returned into *slice, it is owned by the caller. + */ +int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, gpr_slice *slice, + size_t max_size_hint, grpc_closure *on_complete); + +void grpc_byte_stream_destroy(grpc_byte_stream *byte_stream); + +/* grpc_byte_stream that wraps a slice buffer */ +typedef struct grpc_slice_buffer_stream { + grpc_byte_stream base; + gpr_slice_buffer *backing_buffer; + size_t cursor; +} grpc_slice_buffer_stream; + +void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream, + gpr_slice_buffer *slice_buffer, + gpr_uint32 flags); + +#endif /* GRPC_INTERNAL_CORE_TRANSPORT_BYTE_STREAM_H */ diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index 07179a4571..7287f97aaa 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -45,12 +45,15 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_init( grpc_chttp2_data_parser *parser) { parser->state = GRPC_CHTTP2_DATA_FH_0; - grpc_sopb_init(&parser->incoming_sopb); return GRPC_CHTTP2_PARSE_OK; } void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser *parser) { - grpc_sopb_destroy(&parser->incoming_sopb); + grpc_byte_stream *bs; + while ( + (bs = grpc_chttp2_incoming_frame_queue_pop(&parser->incoming_frames))) { + grpc_byte_stream_destroy(bs); + } } grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame( @@ -69,6 +72,62 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame( return GRPC_CHTTP2_PARSE_OK; } +void grpc_chttp2_incoming_frame_queue_merge( + grpc_chttp2_incoming_frame_queue *head_dst, + grpc_chttp2_incoming_frame_queue *tail_src) { + if (tail_src->head == NULL) { + return; + } + + if (head_dst->head == NULL) { + *head_dst = *tail_src; + memset(tail_src, 0, sizeof(*tail_src)); + return; + } + + head_dst->tail->next_message = tail_src->head; + head_dst->tail = tail_src->tail; + memset(tail_src, 0, sizeof(*tail_src)); +} + +grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop( + grpc_chttp2_incoming_frame_queue *q) { + grpc_byte_stream *out; + if (q->head == NULL) { + return NULL; + } + out = &q->head->base; + if (q->head == q->tail) { + memset(q, 0, sizeof(*q)); + } else { + q->head = q->head->next_message; + } + return out; +} + +void grpc_chttp2_encode_data(gpr_uint32 id, gpr_slice_buffer *inbuf, + gpr_uint32 write_bytes, int is_eof, + gpr_slice_buffer *outbuf) { + gpr_slice hdr; + gpr_uint8 *p; + + hdr = gpr_slice_malloc(9); + p = GPR_SLICE_START_PTR(hdr); + GPR_ASSERT(write_bytes < 16777316); + *p++ = (gpr_uint8)(write_bytes >> 16); + *p++ = (gpr_uint8)(write_bytes >> 8); + *p++ = (gpr_uint8)(write_bytes); + *p++ = GRPC_CHTTP2_FRAME_DATA; + *p++ = is_eof ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0; + *p++ = (gpr_uint8)(id >> 24); + *p++ = (gpr_uint8)(id >> 16); + *p++ = (gpr_uint8)(id >> 8); + *p++ = (gpr_uint8)(id); + gpr_slice_buffer_add(outbuf, hdr); + + gpr_slice_buffer_move_first(inbuf, write_bytes, outbuf); +} + grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport_parsing *transport_parsing, @@ -77,7 +136,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); gpr_uint8 *cur = beg; grpc_chttp2_data_parser *p = parser; - gpr_uint32 message_flags = 0; + gpr_uint32 message_flags; + grpc_chttp2_incoming_byte_stream *incoming_byte_stream; if (is_last && p->is_last_frame) { stream_parsing->received_close = 1; @@ -132,11 +192,14 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( p->frame_size |= ((gpr_uint32)*cur); p->state = GRPC_CHTTP2_DATA_FRAME; ++cur; + message_flags = 0; if (p->is_frame_compressed) { message_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } - grpc_sopb_add_begin_message(&p->incoming_sopb, p->frame_size, - message_flags); + p->parsing_frame = incoming_byte_stream = + grpc_chttp2_incoming_byte_stream_create( + transport_parsing, stream_parsing, p->frame_size, message_flags, + &p->incoming_frames); /* fallthrough */ case GRPC_CHTTP2_DATA_FRAME: if (cur == end) { @@ -147,20 +210,23 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); if ((gpr_uint32)(end - cur) == p->frame_size) { - grpc_sopb_add_slice( - &p->incoming_sopb, + grpc_chttp2_incoming_byte_stream_push( + exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); + grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame); p->state = GRPC_CHTTP2_DATA_FH_0; return GRPC_CHTTP2_PARSE_OK; } else if ((gpr_uint32)(end - cur) > p->frame_size) { - grpc_sopb_add_slice(&p->incoming_sopb, - gpr_slice_sub(slice, (size_t)(cur - beg), - (size_t)(cur + p->frame_size - beg))); + grpc_chttp2_incoming_byte_stream_push( + exec_ctx, p->parsing_frame, + gpr_slice_sub(slice, (size_t)(cur - beg), + (size_t)(cur + p->frame_size - beg))); + grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame); cur += p->frame_size; goto fh_0; /* loop */ } else { - grpc_sopb_add_slice( - &p->incoming_sopb, + grpc_chttp2_incoming_byte_stream_push( + exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); GPR_ASSERT((size_t)(end - cur) <= p->frame_size); p->frame_size -= (gpr_uint32)(end - cur); diff --git a/src/core/transport/chttp2/frame_data.h b/src/core/transport/chttp2/frame_data.h index 6762484e5b..bc32f29d97 100644 --- a/src/core/transport/chttp2/frame_data.h +++ b/src/core/transport/chttp2/frame_data.h @@ -39,7 +39,7 @@ #include "src/core/iomgr/exec_ctx.h" #include <grpc/support/slice.h> #include <grpc/support/slice_buffer.h> -#include "src/core/transport/stream_op.h" +#include "src/core/transport/byte_stream.h" #include "src/core/transport/chttp2/frame.h" typedef enum { @@ -51,6 +51,14 @@ typedef enum { GRPC_CHTTP2_DATA_FRAME } grpc_chttp2_stream_state; +typedef struct grpc_chttp2_incoming_byte_stream + grpc_chttp2_incoming_byte_stream; + +typedef struct grpc_chttp2_incoming_frame_queue { + grpc_chttp2_incoming_byte_stream *head; + grpc_chttp2_incoming_byte_stream *tail; +} grpc_chttp2_incoming_frame_queue; + typedef struct { grpc_chttp2_stream_state state; gpr_uint8 is_last_frame; @@ -58,9 +66,16 @@ typedef struct { gpr_uint32 frame_size; int is_frame_compressed; - grpc_stream_op_buffer incoming_sopb; + grpc_chttp2_incoming_frame_queue incoming_frames; + grpc_chttp2_incoming_byte_stream *parsing_frame; } grpc_chttp2_data_parser; +void grpc_chttp2_incoming_frame_queue_merge( + grpc_chttp2_incoming_frame_queue *head_dst, + grpc_chttp2_incoming_frame_queue *tail_src); +grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop( + grpc_chttp2_incoming_frame_queue *q); + /* initialize per-stream state for data frame parsing */ grpc_chttp2_parse_error grpc_chttp2_data_parser_init( grpc_chttp2_data_parser *parser); @@ -81,4 +96,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( /* create a slice with an empty data frame and is_last set */ gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id); +void grpc_chttp2_encode_data(gpr_uint32 id, gpr_slice_buffer *inbuf, + gpr_uint32 write_bytes, int is_eof, + gpr_slice_buffer *outbuf); + #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_DATA_H */ diff --git a/src/core/transport/chttp2/frame_window_update.c b/src/core/transport/chttp2/frame_window_update.c index 91bbcfe2c1..74ca29baf9 100644 --- a/src/core/transport/chttp2/frame_window_update.c +++ b/src/core/transport/chttp2/frame_window_update.c @@ -89,7 +89,8 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse( } if (p->byte == 4) { - if (p->amount == 0 || (p->amount & 0x80000000u)) { + gpr_uint32 received_update = p->amount; + if (received_update == 0 || (received_update & 0x80000000u)) { gpr_log(GPR_ERROR, "invalid window update bytes: %d", p->amount); return GRPC_CHTTP2_CONNECTION_ERROR; } @@ -97,17 +98,15 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse( if (transport_parsing->incoming_stream_id != 0) { if (stream_parsing != NULL) { - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("update", transport_parsing, - stream_parsing, outgoing_window_update, - p->amount); - stream_parsing->outgoing_window_update += p->amount; + GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", transport_parsing, + stream_parsing, outgoing_window, + received_update); grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); } } else { - GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT("update", transport_parsing, - outgoing_window_update, p->amount); - transport_parsing->outgoing_window_update += p->amount; + GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parse", transport_parsing, + outgoing_window, received_update); } } diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/hpack_encoder.c index 24a5d958b8..6c7c00b7c3 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/hpack_encoder.c @@ -31,7 +31,7 @@ * */ -#include "src/core/transport/chttp2/stream_encoder.h" +#include "src/core/transport/chttp2/hpack_encoder.h" #include <assert.h> #include <string.h> @@ -54,18 +54,13 @@ /* don't consider adding anything bigger than this to the hpack table */ #define MAX_DECODER_SPACE_USAGE 512 -/* what kind of frame our we encoding? */ -typedef enum { HEADER, DATA, NONE } frame_type; - typedef struct { - frame_type cur_frame_type; + int is_first_frame; /* number of bytes in 'output' when we started the frame - used to calculate frame length */ size_t output_length_at_start_of_frame; /* index (in output) of the header for the current frame */ size_t header_idx; - /* was the last frame emitted a header? (if yes, we'll need a CONTINUATION */ - gpr_uint8 last_was_header; /* have we seen a regular (non-colon-prefixed) header yet? */ gpr_uint8 seen_regular_header; /* output stream id */ @@ -92,58 +87,35 @@ static void fill_header(gpr_uint8 *p, gpr_uint8 type, gpr_uint32 id, size_t len, static void finish_frame(framer_state *st, int is_header_boundary, int is_last_in_stream) { gpr_uint8 type = 0xff; - switch (st->cur_frame_type) { - case HEADER: - type = st->last_was_header ? GRPC_CHTTP2_FRAME_CONTINUATION - : GRPC_CHTTP2_FRAME_HEADER; - st->last_was_header = 1; - break; - case DATA: - type = GRPC_CHTTP2_FRAME_DATA; - st->last_was_header = 0; - is_header_boundary = 0; - break; - case NONE: - return; - } + type = st->is_first_frame ? GRPC_CHTTP2_FRAME_HEADER + : GRPC_CHTTP2_FRAME_CONTINUATION; fill_header( GPR_SLICE_START_PTR(st->output->slices[st->header_idx]), type, st->stream_id, st->output->length - st->output_length_at_start_of_frame, (gpr_uint8)( (is_last_in_stream ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0) | (is_header_boundary ? GRPC_CHTTP2_DATA_FLAG_END_HEADERS : 0))); - st->cur_frame_type = NONE; + st->is_first_frame = 0; } /* begin a new frame: reserve off header space, remember how many bytes we'd output before beginning */ -static void begin_frame(framer_state *st, frame_type type) { - GPR_ASSERT(type != NONE); - GPR_ASSERT(st->cur_frame_type == NONE); - st->cur_frame_type = type; +static void begin_frame(framer_state *st) { st->header_idx = gpr_slice_buffer_add_indexed(st->output, gpr_slice_malloc(9)); st->output_length_at_start_of_frame = st->output->length; } -static void begin_new_frame(framer_state *st, frame_type type) { - finish_frame(st, 1, 0); - st->last_was_header = 0; - begin_frame(st, type); -} - /* make sure that the current frame is of the type desired, and has sufficient space to add at least about_to_add bytes -- finishes the current frame if needed */ -static void ensure_frame_type(framer_state *st, frame_type type, - size_t need_bytes) { - if (st->cur_frame_type == type && - st->output->length - st->output_length_at_start_of_frame + need_bytes <= - GRPC_CHTTP2_MAX_PAYLOAD_LENGTH) { +static void ensure_space(framer_state *st, size_t need_bytes) { + if (st->output->length - st->output_length_at_start_of_frame + need_bytes <= + GRPC_CHTTP2_MAX_PAYLOAD_LENGTH) { return; } - finish_frame(st, type != HEADER, 0); - begin_frame(st, type); + finish_frame(st, 0, 0); + begin_frame(st); } /* increment a filter count, halve all counts if one element reaches max */ @@ -165,31 +137,30 @@ static void add_header_data(framer_state *st, gpr_slice slice) { size_t len = GPR_SLICE_LENGTH(slice); size_t remaining; if (len == 0) return; - ensure_frame_type(st, HEADER, 1); remaining = GRPC_CHTTP2_MAX_PAYLOAD_LENGTH + st->output_length_at_start_of_frame - st->output->length; if (len <= remaining) { gpr_slice_buffer_add(st->output, slice); } else { gpr_slice_buffer_add(st->output, gpr_slice_split_head(&slice, remaining)); + finish_frame(st, 0, 0); + begin_frame(st); add_header_data(st, slice); } } static gpr_uint8 *add_tiny_header_data(framer_state *st, size_t len) { - ensure_frame_type(st, HEADER, len); + ensure_space(st, len); return gpr_slice_buffer_tiny_add(st->output, len); } -/* add an element to the decoder table: returns metadata element to unref */ -static grpc_mdelem *add_elem(grpc_chttp2_hpack_compressor *c, - grpc_mdelem *elem) { +/* add an element to the decoder table */ +static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) { gpr_uint32 key_hash = elem->key->hash; gpr_uint32 elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash); gpr_uint32 new_index = c->tail_remote_index + c->table_elems + 1; size_t elem_size = 32 + GPR_SLICE_LENGTH(elem->key->slice) + GPR_SLICE_LENGTH(elem->value->slice); - grpc_mdelem *elem_to_unref; GPR_ASSERT(elem_size < 65536); @@ -220,31 +191,27 @@ static grpc_mdelem *add_elem(grpc_chttp2_hpack_compressor *c, if (c->entries_elems[HASH_FRAGMENT_2(elem_hash)] == elem) { /* already there: update with new index */ c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index; - elem_to_unref = elem; } else if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == elem) { /* already there (cuckoo): update with new index */ c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index; - elem_to_unref = elem; } else if (c->entries_elems[HASH_FRAGMENT_2(elem_hash)] == NULL) { /* not there, but a free element: add */ - c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = elem; + c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = GRPC_MDELEM_REF(elem); c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index; - elem_to_unref = NULL; } else if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == NULL) { /* not there (cuckoo), but a free element: add */ - c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = elem; + c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = GRPC_MDELEM_REF(elem); c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index; - elem_to_unref = NULL; } else if (c->indices_elems[HASH_FRAGMENT_2(elem_hash)] < c->indices_elems[HASH_FRAGMENT_3(elem_hash)]) { /* not there: replace oldest */ - elem_to_unref = c->entries_elems[HASH_FRAGMENT_2(elem_hash)]; - c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = elem; + GRPC_MDELEM_UNREF(c->entries_elems[HASH_FRAGMENT_2(elem_hash)]); + c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = GRPC_MDELEM_REF(elem); c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index; } else { /* not there: replace oldest */ - elem_to_unref = c->entries_elems[HASH_FRAGMENT_3(elem_hash)]; - c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = elem; + GRPC_MDELEM_UNREF(c->entries_elems[HASH_FRAGMENT_3(elem_hash)]); + c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = GRPC_MDELEM_REF(elem); c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index; } @@ -270,8 +237,6 @@ static grpc_mdelem *add_elem(grpc_chttp2_hpack_compressor *c, c->entries_keys[HASH_FRAGMENT_3(key_hash)] = GRPC_MDSTR_REF(elem->key); c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index; } - - return elem_to_unref; } static void emit_indexed(grpc_chttp2_hpack_compressor *c, gpr_uint32 elem_index, @@ -370,9 +335,9 @@ static gpr_uint32 dynidx(grpc_chttp2_hpack_compressor *c, c->table_elems - elem_index; } -/* encode an mdelem; returns metadata element to unref */ -static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c, - grpc_mdelem *elem, framer_state *st) { +/* encode an mdelem */ +static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, + framer_state *st) { gpr_uint32 key_hash = elem->key->hash; gpr_uint32 elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash); size_t decoder_space_usage; @@ -397,7 +362,7 @@ static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c, /* HIT: complete element (first cuckoo hash) */ emit_indexed(c, dynidx(c, c->indices_elems[HASH_FRAGMENT_2(elem_hash)]), st); - return elem; + return; } if (c->entries_elems[HASH_FRAGMENT_3(elem_hash)] == elem && @@ -405,7 +370,7 @@ static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c, /* HIT: complete element (second cuckoo hash) */ emit_indexed(c, dynidx(c, c->indices_elems[HASH_FRAGMENT_3(elem_hash)]), st); - return elem; + return; } /* should this elem be in the table? */ @@ -423,12 +388,13 @@ static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c, /* HIT: key (first cuckoo hash) */ if (should_add_elem) { emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st); - return add_elem(c, elem); + add_elem(c, elem); + return; } else { emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st); - return elem; + return; } - GPR_UNREACHABLE_CODE(return NULL); + GPR_UNREACHABLE_CODE(return ); } indices_key = c->indices_keys[HASH_FRAGMENT_3(key_hash)]; @@ -437,24 +403,26 @@ static grpc_mdelem *hpack_enc(grpc_chttp2_hpack_compressor *c, /* HIT: key (first cuckoo hash) */ if (should_add_elem) { emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st); - return add_elem(c, elem); + add_elem(c, elem); + return; } else { emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st); - return elem; + return; } - GPR_UNREACHABLE_CODE(return NULL); + GPR_UNREACHABLE_CODE(return ); } /* no elem, key in the table... fall back to literal emission */ if (should_add_elem) { emit_lithdr_incidx_v(c, elem, st); - return add_elem(c, elem); + add_elem(c, elem); + return; } else { emit_lithdr_noidx_v(c, elem, st); - return elem; + return; } - GPR_UNREACHABLE_CODE(return NULL); + GPR_UNREACHABLE_CODE(return ); } #define STRLEN_LIT(x) (sizeof(x) - 1) @@ -469,8 +437,8 @@ static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline, mdelem = grpc_mdelem_from_metadata_strings( c->mdctx, GRPC_MDSTR_REF(c->timeout_key_str), grpc_mdstr_from_string(c->mdctx, timeout_str)); - mdelem = hpack_enc(c, mdelem, st); - if (mdelem) GRPC_MDELEM_UNREF(mdelem); + hpack_enc(c, mdelem, st); + GRPC_MDELEM_UNREF(mdelem); } gpr_slice grpc_chttp2_data_frame_create_empty_close(gpr_uint32 id) { @@ -495,169 +463,34 @@ void grpc_chttp2_hpack_compressor_destroy(grpc_chttp2_hpack_compressor *c) { GRPC_MDSTR_UNREF(c->timeout_key_str); } -gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, - gpr_uint32 max_flow_controlled_bytes, - grpc_stream_op_buffer *outops) { - gpr_slice slice; - grpc_stream_op *op; - gpr_uint32 max_take_size; - gpr_uint32 flow_controlled_bytes_taken = 0; - gpr_uint32 curop = 0; - gpr_uint8 *p; - gpr_uint8 compressed_flag_set = 0; - - while (curop < *inops_count) { - GPR_ASSERT(flow_controlled_bytes_taken <= max_flow_controlled_bytes); - op = &inops[curop]; - switch (op->type) { - case GRPC_NO_OP: - /* skip */ - curop++; - break; - case GRPC_OP_METADATA: - grpc_metadata_batch_assert_ok(&op->data.metadata); - /* these just get copied as they don't impact the number of flow - controlled bytes */ - grpc_sopb_append(outops, op, 1); - curop++; - break; - case GRPC_OP_BEGIN_MESSAGE: - /* begin op: for now we just convert the op to a slice and fall - through - this lets us reuse the slice framing code below */ - compressed_flag_set = - (op->data.begin_message.flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0; - slice = gpr_slice_malloc(5); - - p = GPR_SLICE_START_PTR(slice); - p[0] = compressed_flag_set; - p[1] = (gpr_uint8)(op->data.begin_message.length >> 24); - p[2] = (gpr_uint8)(op->data.begin_message.length >> 16); - p[3] = (gpr_uint8)(op->data.begin_message.length >> 8); - p[4] = (gpr_uint8)(op->data.begin_message.length); - op->type = GRPC_OP_SLICE; - op->data.slice = slice; - /* fallthrough */ - case GRPC_OP_SLICE: - slice = op->data.slice; - if (!GPR_SLICE_LENGTH(slice)) { - /* skip zero length slices */ - gpr_slice_unref(slice); - curop++; - break; - } - max_take_size = max_flow_controlled_bytes - flow_controlled_bytes_taken; - if (max_take_size == 0) { - goto exit_loop; - } - if (GPR_SLICE_LENGTH(slice) > max_take_size) { - slice = gpr_slice_split_head(&op->data.slice, max_take_size); - grpc_sopb_add_slice(outops, slice); - } else { - /* consume this op immediately */ - grpc_sopb_append(outops, op, 1); - curop++; - } - flow_controlled_bytes_taken += (gpr_uint32)GPR_SLICE_LENGTH(slice); - break; - } - } -exit_loop: - *inops_count -= curop; - memmove(inops, inops + curop, *inops_count * sizeof(grpc_stream_op)); - - for (curop = 0; curop < *inops_count; curop++) { - if (inops[curop].type == GRPC_OP_METADATA) { - grpc_metadata_batch_assert_ok(&inops[curop].data.metadata); - } - } - - return flow_controlled_bytes_taken; -} - -void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, - gpr_uint32 stream_id, - grpc_chttp2_hpack_compressor *compressor, - gpr_slice_buffer *output) { +void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c, + gpr_uint32 stream_id, + grpc_metadata_batch *metadata, int is_eof, + gpr_slice_buffer *outbuf) { framer_state st; - gpr_slice slice; - grpc_stream_op *op; - size_t max_take_size; - gpr_uint32 curop = 0; - gpr_uint32 unref_op; grpc_linked_mdelem *l; - int need_unref = 0; gpr_timespec deadline; GPR_ASSERT(stream_id != 0); - st.cur_frame_type = NONE; - st.last_was_header = 0; st.seen_regular_header = 0; st.stream_id = stream_id; - st.output = output; - - while (curop < ops_count) { - op = &ops[curop]; - switch (op->type) { - case GRPC_NO_OP: - case GRPC_OP_BEGIN_MESSAGE: - gpr_log( - GPR_ERROR, - "These stream ops should be filtered out by grpc_chttp2_preencode"); - abort(); - case GRPC_OP_METADATA: - /* Encode a metadata batch; store the returned values, representing - a metadata element that needs to be unreffed back into the metadata - slot. THIS MAY NOT BE THE SAME ELEMENT (if a decoder table slot got - updated). After this loop, we'll do a batch unref of elements. */ - begin_new_frame(&st, HEADER); - need_unref |= op->data.metadata.garbage.head != NULL; - grpc_metadata_batch_assert_ok(&op->data.metadata); - for (l = op->data.metadata.list.head; l; l = l->next) { - l->md = hpack_enc(compressor, l->md, &st); - need_unref |= l->md != NULL; - } - deadline = op->data.metadata.deadline; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) != 0) { - deadline_enc(compressor, deadline, &st); - } - curop++; - break; - case GRPC_OP_SLICE: - slice = op->data.slice; - if (st.cur_frame_type == DATA && - st.output->length - st.output_length_at_start_of_frame == - GRPC_CHTTP2_MAX_PAYLOAD_LENGTH) { - finish_frame(&st, 0, 0); - } - ensure_frame_type(&st, DATA, 1); - max_take_size = GRPC_CHTTP2_MAX_PAYLOAD_LENGTH + - st.output_length_at_start_of_frame - st.output->length; - if (GPR_SLICE_LENGTH(slice) > max_take_size) { - slice = gpr_slice_split_head(&op->data.slice, max_take_size); - } else { - /* consume this op immediately */ - curop++; - } - gpr_slice_buffer_add(output, slice); - break; - } + st.output = outbuf; + st.is_first_frame = 1; + + /* Encode a metadata batch; store the returned values, representing + a metadata element that needs to be unreffed back into the metadata + slot. THIS MAY NOT BE THE SAME ELEMENT (if a decoder table slot got + updated). After this loop, we'll do a batch unref of elements. */ + begin_frame(&st); + grpc_metadata_batch_assert_ok(metadata); + for (l = metadata->list.head; l; l = l->next) { + hpack_enc(c, l->md, &st); } - if (eof && st.cur_frame_type == NONE) { - begin_frame(&st, DATA); - } - finish_frame(&st, 1, eof); - - if (need_unref) { - for (unref_op = 0; unref_op < curop; unref_op++) { - op = &ops[unref_op]; - if (op->type != GRPC_OP_METADATA) continue; - for (l = op->data.metadata.list.head; l; l = l->next) { - if (l->md) GRPC_MDELEM_UNREF(l->md); - } - for (l = op->data.metadata.garbage.head; l; l = l->next) { - GRPC_MDELEM_UNREF(l->md); - } - } + deadline = metadata->deadline; + if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) != 0) { + deadline_enc(c, deadline, &st); } + + finish_frame(&st, 1, is_eof); } diff --git a/src/core/transport/chttp2/stream_encoder.h b/src/core/transport/chttp2/hpack_encoder.h index db52f2a0f6..59b122dfda 100644 --- a/src/core/transport/chttp2/stream_encoder.h +++ b/src/core/transport/chttp2/hpack_encoder.h @@ -31,12 +31,12 @@ * */ -#ifndef GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_STREAM_ENCODER_H -#define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_STREAM_ENCODER_H +#ifndef GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_HPACK_ENCODER_H +#define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_HPACK_ENCODER_H #include "src/core/transport/chttp2/frame.h" #include "src/core/transport/metadata.h" -#include "src/core/transport/stream_op.h" +#include "src/core/transport/metadata_batch.h" #include <grpc/support/port_platform.h> #include <grpc/support/slice.h> #include <grpc/support/slice_buffer.h> @@ -78,16 +78,8 @@ void grpc_chttp2_hpack_compressor_init(grpc_chttp2_hpack_compressor *c, grpc_mdctx *mdctx); void grpc_chttp2_hpack_compressor_destroy(grpc_chttp2_hpack_compressor *c); -/* select stream ops to be encoded, moving them from inops to outops, and - moving subsequent ops in inops forward in the queue */ -gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, - gpr_uint32 max_flow_controlled_bytes, - grpc_stream_op_buffer *outops); +void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c, gpr_uint32 id, + grpc_metadata_batch *metadata, int is_eof, + gpr_slice_buffer *outbuf); -/* encode stream ops to output */ -void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, - gpr_uint32 stream_id, - grpc_chttp2_hpack_compressor *compressor, - gpr_slice_buffer *output); - -#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_STREAM_ENCODER_H */ +#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_HPACK_ENCODER_H */ diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c index 20d8312d54..6eebfc3ce4 100644 --- a/src/core/transport/chttp2/hpack_parser.c +++ b/src/core/transport/chttp2/hpack_parser.c @@ -38,13 +38,15 @@ #include <string.h> #include <assert.h> -#include "src/core/transport/chttp2/bin_encoder.h" -#include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> #include <grpc/support/useful.h> +#include "src/core/profiling/timers.h" +#include "src/core/support/string.h" +#include "src/core/transport/chttp2/bin_encoder.h" + typedef enum { NOT_BINARY, B64_BYTE0, @@ -1379,20 +1381,23 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse( grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_slice slice, int is_last) { grpc_chttp2_hpack_parser *parser = hpack_parser; + GPR_TIMER_BEGIN("grpc_chttp2_hpack_parser_parse", 0); if (!grpc_chttp2_hpack_parser_parse(parser, GPR_SLICE_START_PTR(slice), GPR_SLICE_END_PTR(slice))) { + GPR_TIMER_END("grpc_chttp2_hpack_parser_parse", 0); return GRPC_CHTTP2_CONNECTION_ERROR; } if (is_last) { if (parser->is_boundary && parser->state != parse_begin) { gpr_log(GPR_ERROR, "end of header frame not aligned with a hpack record boundary"); + GPR_TIMER_END("grpc_chttp2_hpack_parser_parse", 0); return GRPC_CHTTP2_CONNECTION_ERROR; } if (parser->is_boundary) { - grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into( - &stream_parsing->incoming_metadata, - &stream_parsing->data_parser.incoming_sopb); + stream_parsing + ->got_metadata_on_parse[stream_parsing->header_frames_received] = 1; + stream_parsing->header_frames_received++; grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); } @@ -1404,5 +1409,6 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse( parser->is_boundary = 0xde; parser->is_eof = 0xde; } + GPR_TIMER_END("grpc_chttp2_hpack_parser_parse", 0); return GRPC_CHTTP2_PARSE_OK; } diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c index 10c64f3356..956afc8e9d 100644 --- a/src/core/transport/chttp2/incoming_metadata.c +++ b/src/core/transport/chttp2/incoming_metadata.c @@ -48,14 +48,27 @@ void grpc_chttp2_incoming_metadata_buffer_init( void grpc_chttp2_incoming_metadata_buffer_destroy( grpc_chttp2_incoming_metadata_buffer *buffer) { size_t i; + if (!buffer->published) { + for (i = 0; i < buffer->count; i++) { + GRPC_MDELEM_UNREF(buffer->elems[i].md); + } + } + gpr_free(buffer->elems); +} + +void grpc_chttp2_incoming_metadata_buffer_reset( + grpc_chttp2_incoming_metadata_buffer *buffer) { + size_t i; + GPR_ASSERT(!buffer->published); for (i = 0; i < buffer->count; i++) { GRPC_MDELEM_UNREF(buffer->elems[i].md); } - gpr_free(buffer->elems); + buffer->count = 0; } void grpc_chttp2_incoming_metadata_buffer_add( grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem *elem) { + GPR_ASSERT(!buffer->published); if (buffer->capacity == buffer->count) { buffer->capacity = GPR_MAX(8, 2 * buffer->capacity); buffer->elems = @@ -66,117 +79,36 @@ void grpc_chttp2_incoming_metadata_buffer_add( void grpc_chttp2_incoming_metadata_buffer_set_deadline( grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) { + GPR_ASSERT(!buffer->published); buffer->deadline = deadline; } -void grpc_chttp2_incoming_metadata_live_op_buffer_end( - grpc_chttp2_incoming_metadata_live_op_buffer *buffer) { - gpr_free(buffer->elems); - buffer->elems = NULL; -} - -void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into( - grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb) { - grpc_metadata_batch b; - - b.list.head = NULL; - /* Store away the last element of the list, so that in patch_metadata_ops - we can reconstitute the list. - We can't do list building here as later incoming metadata may reallocate - the underlying array. */ - b.list.tail = (void *)(gpr_intptr)buffer->count; - b.garbage.head = b.garbage.tail = NULL; - b.deadline = buffer->deadline; - buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); - - grpc_sopb_add_metadata(sopb, b); -} - void grpc_chttp2_incoming_metadata_buffer_swap( grpc_chttp2_incoming_metadata_buffer *a, grpc_chttp2_incoming_metadata_buffer *b) { + GPR_ASSERT(!a->published); + GPR_ASSERT(!b->published); GPR_SWAP(grpc_chttp2_incoming_metadata_buffer, *a, *b); } -void grpc_incoming_metadata_buffer_move_to_referencing_sopb( - grpc_chttp2_incoming_metadata_buffer *src, - grpc_chttp2_incoming_metadata_buffer *dst, grpc_stream_op_buffer *sopb) { - size_t delta; - size_t i; - dst->deadline = gpr_time_min(src->deadline, dst->deadline); - - if (src->count == 0) { - return; - } - if (dst->count == 0) { - grpc_chttp2_incoming_metadata_buffer_swap(src, dst); - return; - } - delta = dst->count; - if (dst->capacity < src->count + dst->count) { - dst->capacity = GPR_MAX(dst->capacity * 2, src->count + dst->count); - dst->elems = gpr_realloc(dst->elems, dst->capacity * sizeof(*dst->elems)); - } - memcpy(dst->elems + dst->count, src->elems, src->count * sizeof(*src->elems)); - dst->count += src->count; - for (i = 0; i < sopb->nops; i++) { - if (sopb->ops[i].type != GRPC_OP_METADATA) continue; - sopb->ops[i].data.metadata.list.tail = - (void *)(delta + (gpr_uintptr)sopb->ops[i].data.metadata.list.tail); - } - src->count = 0; -} - -void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op( - grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb, - grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer) { - grpc_stream_op *ops = sopb->ops; - size_t nops = sopb->nops; - size_t i; - size_t j; - size_t mdidx = 0; - size_t last_mdidx; - int found_metadata = 0; - - /* rework the array of metadata into a linked list, making use - of the breadcrumbs we left in metadata batches during - add_metadata_batch */ - for (i = 0; i < nops; i++) { - grpc_stream_op *op = &ops[i]; - if (op->type != GRPC_OP_METADATA) continue; - found_metadata = 1; - /* we left a breadcrumb indicating where the end of this list is, - and since we add sequentially, we know from the end of the last - segment where this segment begins */ - last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail); - GPR_ASSERT(last_mdidx > mdidx); - GPR_ASSERT(last_mdidx <= buffer->count); - /* turn the array into a doubly linked list */ - op->data.metadata.list.head = &buffer->elems[mdidx]; - op->data.metadata.list.tail = &buffer->elems[last_mdidx - 1]; - for (j = mdidx + 1; j < last_mdidx; j++) { - buffer->elems[j].prev = &buffer->elems[j - 1]; - buffer->elems[j - 1].next = &buffer->elems[j]; +void grpc_chttp2_incoming_metadata_buffer_publish( + grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch) { + GPR_ASSERT(!buffer->published); + buffer->published = 1; + if (buffer->count > 0) { + size_t i; + for (i = 1; i < buffer->count; i++) { + buffer->elems[i].prev = &buffer->elems[i - 1]; } - buffer->elems[mdidx].prev = NULL; - buffer->elems[last_mdidx - 1].next = NULL; - /* track where we're up to */ - mdidx = last_mdidx; - } - if (found_metadata) { - live_op_buffer->elems = buffer->elems; - if (mdidx != buffer->count) { - /* we have a partially read metadata batch still in incoming_metadata */ - size_t new_count = buffer->count - mdidx; - size_t copy_bytes = sizeof(*buffer->elems) * new_count; - GPR_ASSERT(mdidx < buffer->count); - buffer->elems = gpr_malloc(copy_bytes); - memcpy(buffer->elems, live_op_buffer->elems + mdidx, copy_bytes); - buffer->count = buffer->capacity = new_count; - } else { - buffer->elems = NULL; - buffer->count = 0; - buffer->capacity = 0; + for (i = 0; i < buffer->count - 1; i++) { + buffer->elems[i].next = &buffer->elems[i + 1]; } + buffer->elems[0].prev = NULL; + buffer->elems[buffer->count - 1].next = NULL; + batch->list.head = &buffer->elems[0]; + batch->list.tail = &buffer->elems[buffer->count - 1]; + } else { + batch->list.head = batch->list.tail = NULL; } + batch->deadline = buffer->deadline; } diff --git a/src/core/transport/chttp2/incoming_metadata.h b/src/core/transport/chttp2/incoming_metadata.h index 2f1de411ba..0e1dabe825 100644 --- a/src/core/transport/chttp2/incoming_metadata.h +++ b/src/core/transport/chttp2/incoming_metadata.h @@ -41,12 +41,9 @@ typedef struct { size_t count; size_t capacity; gpr_timespec deadline; + int published; } grpc_chttp2_incoming_metadata_buffer; -typedef struct { - grpc_linked_mdelem *elems; -} grpc_chttp2_incoming_metadata_live_op_buffer; - /** assumes everything initially zeroed */ void grpc_chttp2_incoming_metadata_buffer_init( grpc_chttp2_incoming_metadata_buffer *buffer); @@ -54,27 +51,12 @@ void grpc_chttp2_incoming_metadata_buffer_destroy( grpc_chttp2_incoming_metadata_buffer *buffer); void grpc_chttp2_incoming_metadata_buffer_reset( grpc_chttp2_incoming_metadata_buffer *buffer); +void grpc_chttp2_incoming_metadata_buffer_publish( + grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch); void grpc_chttp2_incoming_metadata_buffer_add( grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem *elem); void grpc_chttp2_incoming_metadata_buffer_set_deadline( grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline); -/** extend sopb with a metadata batch; this must be post-processed by - grpc_chttp2_incoming_metadata_buffer_postprocess_sopb before being handed - out of the transport */ -void grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into( - grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb); - -void grpc_incoming_metadata_buffer_move_to_referencing_sopb( - grpc_chttp2_incoming_metadata_buffer *src, - grpc_chttp2_incoming_metadata_buffer *dst, grpc_stream_op_buffer *sopb); - -void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op( - grpc_chttp2_incoming_metadata_buffer *buffer, grpc_stream_op_buffer *sopb, - grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer); - -void grpc_chttp2_incoming_metadata_live_op_buffer_end( - grpc_chttp2_incoming_metadata_live_op_buffer *live_op_buffer); - #endif /* GRPC_INTERNAL_CORE_CHTTP2_INCOMING_METADATA_H */ diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index b35f8b5d88..b53c9dee0b 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -34,6 +34,8 @@ #ifndef GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H #define GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H +#include <assert.h> + #include "src/core/iomgr/endpoint.h" #include "src/core/transport/chttp2/frame.h" #include "src/core/transport/chttp2/frame_data.h" @@ -42,9 +44,9 @@ #include "src/core/transport/chttp2/frame_rst_stream.h" #include "src/core/transport/chttp2/frame_settings.h" #include "src/core/transport/chttp2/frame_window_update.h" +#include "src/core/transport/chttp2/hpack_encoder.h" #include "src/core/transport/chttp2/hpack_parser.h" #include "src/core/transport/chttp2/incoming_metadata.h" -#include "src/core/transport/chttp2/stream_encoder.h" #include "src/core/transport/chttp2/stream_map.h" #include "src/core/transport/connectivity_state.h" #include "src/core/transport/transport_impl.h" @@ -56,14 +58,14 @@ typedef struct grpc_chttp2_stream grpc_chttp2_stream; happen to them... this enum labels each list */ typedef enum { GRPC_CHTTP2_LIST_ALL_STREAMS, - GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED, + GRPC_CHTTP2_LIST_CHECK_READ_OPS, + GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE, GRPC_CHTTP2_LIST_WRITABLE, GRPC_CHTTP2_LIST_WRITING, GRPC_CHTTP2_LIST_WRITTEN, GRPC_CHTTP2_LIST_PARSING_SEEN, GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING, - GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING, - GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED, + GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT, /** streams that are waiting to start because there are too many concurrent streams on the connection */ GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY, @@ -113,22 +115,6 @@ typedef enum { GRPC_DTS_FRAME } grpc_chttp2_deframe_transport_state; -typedef enum { - GRPC_WRITE_STATE_OPEN, - GRPC_WRITE_STATE_QUEUED_CLOSE, - GRPC_WRITE_STATE_SENT_CLOSE -} grpc_chttp2_write_state; - -/* flags that can be or'd into stream_global::writing_now */ -#define GRPC_CHTTP2_WRITING_DATA 1 -#define GRPC_CHTTP2_WRITING_WINDOW 2 - -typedef enum { - GRPC_DONT_SEND_CLOSED = 0, - GRPC_SEND_CLOSED, - GRPC_SEND_CLOSED_WITH_RST_STREAM -} grpc_chttp2_send_closed; - typedef struct { grpc_chttp2_stream *head; grpc_chttp2_stream *tail; @@ -160,14 +146,28 @@ typedef struct grpc_chttp2_outstanding_ping { struct grpc_chttp2_outstanding_ping *prev; } grpc_chttp2_outstanding_ping; +/* forward declared in frame_data.h */ +struct grpc_chttp2_incoming_byte_stream { + grpc_byte_stream base; + gpr_refcount refs; + struct grpc_chttp2_incoming_byte_stream *next_message; + + grpc_chttp2_transport *transport; + grpc_chttp2_stream *stream; + int is_tail; + gpr_slice_buffer slices; + grpc_closure *on_next; + gpr_slice *next; +}; + typedef struct { /** data to write next write */ gpr_slice_buffer qbuf; /** window available for us to send to peer */ gpr_int64 outgoing_window; - /** window available for peer to send to us - updated after parse */ - gpr_uint32 incoming_window; + /** window available to announce to peer */ + gpr_int64 announce_incoming_window; /** how much window would we like to have for incoming_window */ gpr_uint32 connection_window_target; @@ -209,6 +209,7 @@ typedef struct { gpr_slice_buffer outbuf; /** hpack encoding */ grpc_chttp2_hpack_compressor hpack_compressor; + gpr_int64 outgoing_window; /** is this a client? */ gpr_uint8 is_client; /** callback for when writing is done */ @@ -233,6 +234,7 @@ struct grpc_chttp2_transport_parsing { gpr_slice_buffer qbuf; /* metadata object cache */ grpc_mdstr *str_grpc_timeout; + grpc_mdelem *elem_grpc_status_ok; /** parser for headers */ grpc_chttp2_hpack_parser hpack_parser; /** simple one shot parsers */ @@ -246,8 +248,7 @@ struct grpc_chttp2_transport_parsing { grpc_chttp2_goaway_parser goaway_parser; /** window available for peer to send to us */ - gpr_uint32 incoming_window; - gpr_uint32 incoming_window_delta; + gpr_int64 incoming_window; /** next stream id available at the time of beginning parsing */ gpr_uint32 next_stream_id; @@ -278,7 +279,7 @@ struct grpc_chttp2_transport_parsing { gpr_uint32 goaway_last_stream_index; gpr_slice goaway_text; - gpr_int64 outgoing_window_update; + gpr_int64 outgoing_window; /** pings awaiting responses */ grpc_chttp2_outstanding_ping pings; @@ -345,8 +346,8 @@ struct grpc_chttp2_transport { struct { /* accept stream callback */ - void (*accept_stream)(void *user_data, grpc_transport *transport, - const void *server_data); + void (*accept_stream)(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_transport *transport, const void *server_data); void *accept_stream_user_data; /** connectivity tracking */ @@ -358,9 +359,6 @@ typedef struct { /** HTTP2 stream id for this stream, or zero if one has not been assigned */ gpr_uint32 id; - grpc_closure *send_done_closure; - grpc_closure *recv_done_closure; - /** window available for us to send to peer */ gpr_int64 outgoing_window; /** The number of bytes the upper layers have offered to receive. @@ -371,54 +369,66 @@ typedef struct { not yet announced to HTTP2 flow control. As the upper layers offer to read more bytes, this value increases. As we advertise incoming flow control window, this value decreases. */ - gpr_uint32 unannounced_incoming_window; - /** The number of bytes of HTTP2 flow control we have advertised. - As we advertise incoming flow control window, this value increases. - As bytes are read, this value decreases. - Updated after parse. */ - gpr_uint32 incoming_window; - /** stream ops the transport user would like to send */ - grpc_stream_op_buffer *outgoing_sopb; + gpr_uint32 unannounced_incoming_window_for_parse; + gpr_uint32 unannounced_incoming_window_for_writing; + /** things the upper layers would like to send */ + grpc_metadata_batch *send_initial_metadata; + grpc_closure *send_initial_metadata_finished; + grpc_byte_stream *send_message; + grpc_closure *send_message_finished; + grpc_metadata_batch *send_trailing_metadata; + grpc_closure *send_trailing_metadata_finished; + + grpc_metadata_batch *recv_initial_metadata; + grpc_closure *recv_initial_metadata_finished; + grpc_byte_stream **recv_message; + grpc_closure *recv_message_ready; + grpc_metadata_batch *recv_trailing_metadata; + grpc_closure *recv_trailing_metadata_finished; + /** when the application requests writes be closed, the write_closed is 'queued'; when the close is flow controlled into the send path, we are 'sending' it; when the write has been performed it is 'sent' */ - grpc_chttp2_write_state write_state; - /** is this stream closed (boolean) */ + gpr_uint8 write_closed; + /** is this stream reading half-closed (boolean) */ gpr_uint8 read_closed; - /** has this stream been cancelled? (boolean) */ - gpr_uint8 cancelled; - grpc_status_code cancelled_status; - /** have we told the upper layer that this stream is cancelled? */ - gpr_uint8 published_cancelled; + /** is this stream finished closing (and reportably closed) */ + gpr_uint8 finished_close; /** is this stream in the stream map? (boolean) */ gpr_uint8 in_stream_map; - /** bitmask of GRPC_CHTTP2_WRITING_xxx above */ - gpr_uint8 writing_now; - /** has anything been written to this stream? */ - gpr_uint8 written_anything; - - /** stream state already published to the upper layer */ - grpc_stream_state published_state; - /** address to publish next stream state to */ - grpc_stream_state *publish_state; - /** pointer to sop buffer to fill in with new stream ops */ - grpc_stream_op_buffer *publish_sopb; - grpc_stream_op_buffer incoming_sopb; + /** has this stream seen an error? if 1, then pending incoming frames + can be thrown away */ + gpr_uint8 seen_error; - /** incoming metadata */ - grpc_chttp2_incoming_metadata_buffer incoming_metadata; - grpc_chttp2_incoming_metadata_live_op_buffer outstanding_metadata; + gpr_uint8 published_initial_metadata; + gpr_uint8 published_trailing_metadata; + gpr_uint8 faked_trailing_metadata; + + grpc_chttp2_incoming_metadata_buffer received_initial_metadata; + grpc_chttp2_incoming_metadata_buffer received_trailing_metadata; + + grpc_chttp2_incoming_frame_queue incoming_frames; } grpc_chttp2_stream_global; typedef struct { /** HTTP2 stream id for this stream, or zero if one has not been assigned */ gpr_uint32 id; - /** sops that have passed flow control to be written */ - grpc_stream_op_buffer sopb; - /** how strongly should we indicate closure with the next write */ - grpc_chttp2_send_closed send_closed; + gpr_uint8 fetching; + gpr_uint8 sent_initial_metadata; + gpr_uint8 sent_message; + gpr_uint8 sent_trailing_metadata; + gpr_uint8 read_closed; + /** send this initial metadata */ + grpc_metadata_batch *send_initial_metadata; + grpc_byte_stream *send_message; + grpc_metadata_batch *send_trailing_metadata; + gpr_int64 outgoing_window; /** how much window should we announce? */ gpr_uint32 announce_window; + gpr_slice_buffer flow_controlled_buffer; + gpr_slice fetching_slice; + size_t stream_fetched; + grpc_closure finished_fetch; } grpc_chttp2_stream_writing; struct grpc_chttp2_stream_parsing { @@ -428,22 +438,29 @@ struct grpc_chttp2_stream_parsing { gpr_uint8 received_close; /** saw a rst_stream */ gpr_uint8 saw_rst_stream; - /** incoming_window has been reduced by this much during parsing */ - gpr_uint32 incoming_window_delta; + /** how many header frames have we received? */ + gpr_uint8 header_frames_received; + /** which metadata did we get (on this parse) */ + gpr_uint8 got_metadata_on_parse[2]; + /** should we raise the seen_error flag in transport_global */ + gpr_uint8 seen_error; /** window available for peer to send to us */ - gpr_uint32 incoming_window; + gpr_int64 incoming_window; /** parsing state for data frames */ grpc_chttp2_data_parser data_parser; /** reason give to rst_stream */ gpr_uint32 rst_stream_reason; - /* amount of window given */ - gpr_uint64 outgoing_window_update; + /** amount of window given */ + gpr_int64 outgoing_window; + /** number of bytes received - reset at end of parse thread execution */ + gpr_int64 received_bytes; /** incoming metadata */ - grpc_chttp2_incoming_metadata_buffer incoming_metadata; + grpc_chttp2_incoming_metadata_buffer metadata_buffer[2]; }; struct grpc_chttp2_stream { + grpc_stream_refcount *refcount; grpc_chttp2_stream_global global; grpc_chttp2_stream_writing writing; grpc_chttp2_stream_parsing parsing; @@ -504,21 +521,10 @@ void grpc_chttp2_list_remove_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); -void grpc_chttp2_list_add_incoming_window_updated( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global); -int grpc_chttp2_list_pop_incoming_window_updated( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_global **stream_global, - grpc_chttp2_stream_parsing **stream_parsing); -void grpc_chttp2_list_remove_incoming_window_updated( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global); - -void grpc_chttp2_list_add_writing_stream( +/* returns 1 if stream added, 0 if it was already present */ +int grpc_chttp2_list_add_writing_stream( grpc_chttp2_transport_writing *transport_writing, - grpc_chttp2_stream_writing *stream_writing); + grpc_chttp2_stream_writing *stream_writing) GRPC_MUST_USE_RESULT; int grpc_chttp2_list_have_writing_streams( grpc_chttp2_transport_writing *transport_writing); int grpc_chttp2_list_pop_writing_stream( @@ -550,31 +556,44 @@ int grpc_chttp2_list_pop_waiting_for_concurrency( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); -void grpc_chttp2_list_add_closed_waiting_for_parsing( +void grpc_chttp2_list_add_check_read_ops( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); -int grpc_chttp2_list_pop_closed_waiting_for_parsing( +int grpc_chttp2_list_pop_check_read_ops( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); -void grpc_chttp2_list_add_cancelled_waiting_for_writing( +void grpc_chttp2_list_add_stalled_by_transport( + grpc_chttp2_transport_writing *transport_writing, + grpc_chttp2_stream_writing *stream_writing); +int grpc_chttp2_list_pop_stalled_by_transport( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global **stream_global); + +void grpc_chttp2_list_add_unannounced_incoming_window_available( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); -int grpc_chttp2_list_pop_cancelled_waiting_for_writing( +void grpc_chttp2_list_remove_unannounced_incoming_window_available( grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global **stream_global); + grpc_chttp2_stream_global *stream_global); +int grpc_chttp2_list_pop_unannounced_incoming_window_available( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_transport_parsing *transport_parsing, + grpc_chttp2_stream_global **stream_global, + grpc_chttp2_stream_parsing **stream_parsing); -void grpc_chttp2_list_add_read_write_state_changed( +void grpc_chttp2_list_add_closed_waiting_for_parsing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); -int grpc_chttp2_list_pop_read_write_state_changed( +int grpc_chttp2_list_pop_closed_waiting_for_parsing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( - grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, + gpr_uint32 id); void grpc_chttp2_add_incoming_goaway( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, @@ -592,7 +611,10 @@ void grpc_chttp2_for_all_streams( grpc_chttp2_stream_global *stream_global)); void grpc_chttp2_parsing_become_skip_parser( - grpc_chttp2_transport_parsing *transport_parsing); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing); + +void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, + grpc_closure **pclosure, int success); #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \ @@ -607,26 +629,122 @@ extern int grpc_flowctl_trace; else \ stmt -#define GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(reason, transport, context, var, \ - delta) \ - if (!(grpc_flowctl_trace)) { \ - } else { \ - grpc_chttp2_flowctl_trace(__FILE__, __LINE__, reason, #context, #var, \ - transport->is_client, context->id, \ - (gpr_int64)(context->var), (gpr_int64)(delta)); \ - } - -#define GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(reason, context, var, delta) \ - if (!(grpc_flowctl_trace)) { \ - } else { \ - grpc_chttp2_flowctl_trace(__FILE__, __LINE__, reason, #context, #var, \ - context->is_client, 0, \ - (gpr_int64)(context->var), (gpr_int64)(delta)); \ - } - -void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason, - const char *context, const char *var, - int is_client, gpr_uint32 stream_id, - gpr_int64 current_value, gpr_int64 delta); +typedef enum { + GRPC_CHTTP2_FLOWCTL_MOVE, + GRPC_CHTTP2_FLOWCTL_CREDIT, + GRPC_CHTTP2_FLOWCTL_DEBIT +} grpc_chttp2_flowctl_op; + +#define GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, transport, id1, id2, dst_context, \ + dst_var, src_context, src_var) \ + do { \ + assert(id1 == id2); \ + if (grpc_flowctl_trace) { \ + grpc_chttp2_flowctl_trace( \ + __FILE__, __LINE__, phase, GRPC_CHTTP2_FLOWCTL_MOVE, #dst_context, \ + #dst_var, #src_context, #src_var, transport->is_client, id1, \ + dst_context->dst_var, src_context->src_var); \ + } \ + dst_context->dst_var += src_context->src_var; \ + src_context->src_var = 0; \ + } while (0) + +#define GRPC_CHTTP2_FLOW_MOVE_STREAM(phase, transport, dst_context, dst_var, \ + src_context, src_var) \ + GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, transport, dst_context->id, \ + src_context->id, dst_context, dst_var, \ + src_context, src_var) +#define GRPC_CHTTP2_FLOW_MOVE_TRANSPORT(phase, dst_context, dst_var, \ + src_context, src_var) \ + GRPC_CHTTP2_FLOW_MOVE_COMMON(phase, dst_context, 0, 0, dst_context, dst_var, \ + src_context, src_var) + +#define GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, transport, id, dst_context, \ + dst_var, amount) \ + do { \ + if (grpc_flowctl_trace) { \ + grpc_chttp2_flowctl_trace(__FILE__, __LINE__, phase, \ + GRPC_CHTTP2_FLOWCTL_CREDIT, #dst_context, \ + #dst_var, NULL, #amount, transport->is_client, \ + id, dst_context->dst_var, amount); \ + } \ + dst_context->dst_var += amount; \ + } while (0) + +#define GRPC_CHTTP2_FLOW_CREDIT_STREAM(phase, transport, dst_context, dst_var, \ + amount) \ + GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, transport, dst_context->id, \ + dst_context, dst_var, amount) +#define GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT(phase, dst_context, dst_var, amount) \ + GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \ + amount) + +#define GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, id, dst_context, \ + dst_var, amount) \ + do { \ + if (grpc_flowctl_trace) { \ + grpc_chttp2_flowctl_trace(__FILE__, __LINE__, phase, \ + GRPC_CHTTP2_FLOWCTL_DEBIT, #dst_context, \ + #dst_var, NULL, #amount, transport->is_client, \ + id, dst_context->dst_var, amount); \ + } \ + dst_context->dst_var -= amount; \ + } while (0) + +#define GRPC_CHTTP2_FLOW_DEBIT_STREAM(phase, transport, dst_context, dst_var, \ + amount) \ + GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, dst_context->id, \ + dst_context, dst_var, amount) +#define GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT(phase, dst_context, dst_var, amount) \ + GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \ + amount) + +void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, + grpc_chttp2_flowctl_op op, const char *context1, + const char *var1, const char *context2, + const char *var2, int is_client, + gpr_uint32 stream_id, gpr_int64 val1, + gpr_int64 val2); + +void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream, + grpc_status_code status, gpr_slice *details); +void grpc_chttp2_mark_stream_closed( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, int close_reads, + int close_writes); +void grpc_chttp2_start_writing(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global); + +#ifdef GRPC_STREAM_REFCOUNT_DEBUG +#define GRPC_CHTTP2_STREAM_REF(stream_global, reason) \ + grpc_chttp2_stream_ref(stream_global, reason) +#define GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, reason) \ + grpc_chttp2_stream_unref(exec_ctx, stream_global, reason) +void grpc_chttp2_stream_ref(grpc_chttp2_stream_global *stream_global, + const char *reason); +void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, + grpc_chttp2_stream_global *stream_global, + const char *reason); +#else +#define GRPC_CHTTP2_STREAM_REF(stream_global, reason) \ + grpc_chttp2_stream_ref(stream_global) +#define GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, reason) \ + grpc_chttp2_stream_unref(exec_ctx, stream_global) +void grpc_chttp2_stream_ref(grpc_chttp2_stream_global *stream_global); +void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, + grpc_chttp2_stream_global *stream_global); +#endif + +grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( + grpc_chttp2_transport_parsing *transport_parsing, + grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size, + gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue); +void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, + grpc_chttp2_incoming_byte_stream *bs, + gpr_slice slice); +void grpc_chttp2_incoming_byte_stream_finished( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs); #endif diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 5d4d8e70c4..8cef8fbb77 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -42,22 +42,28 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> -static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing); +static int init_frame_parser(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_parsing *transport_parsing); static int init_header_frame_parser( - grpc_chttp2_transport_parsing *transport_parsing, int is_continuation); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, + int is_continuation); static int init_data_frame_parser( - grpc_chttp2_transport_parsing *transport_parsing); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing); static int init_rst_stream_parser( - grpc_chttp2_transport_parsing *transport_parsing); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing); static int init_settings_frame_parser( - grpc_chttp2_transport_parsing *transport_parsing); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing); static int init_window_update_frame_parser( - grpc_chttp2_transport_parsing *transport_parsing); -static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing); -static int init_goaway_parser(grpc_chttp2_transport_parsing *transport_parsing); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing); +static int init_ping_parser(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_parsing *transport_parsing); +static int init_goaway_parser(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_parsing *transport_parsing); static int init_skip_frame_parser( - grpc_chttp2_transport_parsing *transport_parsing, int is_header); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, + int is_header); static int parse_frame_slice(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, @@ -74,23 +80,11 @@ void grpc_chttp2_prepare_to_read( transport_parsing->next_stream_id = transport_global->next_stream_id; /* update the parsing view of incoming window */ - if (transport_parsing->incoming_window != transport_global->incoming_window) { - GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( - "parse", transport_parsing, incoming_window, - (gpr_int64)transport_global->incoming_window - - (gpr_int64)transport_parsing->incoming_window); - transport_parsing->incoming_window = transport_global->incoming_window; - } - while (grpc_chttp2_list_pop_incoming_window_updated( + while (grpc_chttp2_list_pop_unannounced_incoming_window_available( transport_global, transport_parsing, &stream_global, &stream_parsing)) { - stream_parsing->id = stream_global->id; - if (stream_parsing->incoming_window != stream_global->incoming_window) { - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( - "parse", transport_parsing, stream_parsing, incoming_window, - (gpr_int64)stream_global->incoming_window - - (gpr_int64)stream_parsing->incoming_window); - stream_parsing->incoming_window = stream_global->incoming_window; - } + GRPC_CHTTP2_FLOW_MOVE_STREAM("parse", transport_parsing, stream_parsing, + incoming_window, stream_global, + unannounced_incoming_window_for_parse); } GPR_TIMER_END("grpc_chttp2_prepare_to_read", 0); @@ -101,6 +95,8 @@ void grpc_chttp2_publish_reads( grpc_chttp2_transport_parsing *transport_parsing) { grpc_chttp2_stream_global *stream_global; grpc_chttp2_stream_parsing *stream_parsing; + int was_zero; + int is_zero; /* transport_parsing->last_incoming_stream_id is used as last-grpc_chttp2_stream-id when @@ -144,98 +140,102 @@ void grpc_chttp2_publish_reads( } /* propagate flow control tokens to global state */ - if (transport_parsing->outgoing_window_update) { - GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( - "parsed", transport_global, outgoing_window, - transport_parsing->outgoing_window_update); - GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( - "parsed", transport_parsing, outgoing_window_update, - -(gpr_int64)transport_parsing->outgoing_window_update); - transport_global->outgoing_window += - transport_parsing->outgoing_window_update; - transport_parsing->outgoing_window_update = 0; - } - - if (transport_parsing->incoming_window_delta) { - GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( - "parsed", transport_global, incoming_window, - -(gpr_int64)transport_parsing->incoming_window_delta); - GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( - "parsed", transport_parsing, incoming_window_delta, - -(gpr_int64)transport_parsing->incoming_window_delta); - transport_global->incoming_window -= - transport_parsing->incoming_window_delta; - transport_parsing->incoming_window_delta = 0; + was_zero = transport_global->outgoing_window <= 0; + GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("parsed", transport_global, outgoing_window, + transport_parsing, outgoing_window); + is_zero = transport_global->outgoing_window <= 0; + if (was_zero && !is_zero) { + while (grpc_chttp2_list_pop_stalled_by_transport(transport_global, + &stream_global)) { + grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + } + } + + if (transport_parsing->incoming_window < + transport_global->connection_window_target * 3 / 4) { + gpr_int64 announce_bytes = transport_global->connection_window_target - + transport_parsing->incoming_window; + GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", transport_global, + announce_incoming_window, announce_bytes); + GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", transport_parsing, + incoming_window, announce_bytes); } /* for each stream that saw an update, fixup global state */ while (grpc_chttp2_list_pop_parsing_seen_stream( transport_global, transport_parsing, &stream_global, &stream_parsing)) { - /* update incoming flow control window */ - if (stream_parsing->incoming_window_delta) { - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( - "parsed", transport_parsing, stream_global, incoming_window, - -(gpr_int64)stream_parsing->incoming_window_delta); - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( - "parsed", transport_parsing, stream_parsing, incoming_window_delta, - -(gpr_int64)stream_parsing->incoming_window_delta); - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( - "parsed", transport_parsing, stream_global, max_recv_bytes, - -(gpr_int64)stream_parsing->incoming_window_delta); - stream_global->incoming_window -= stream_parsing->incoming_window_delta; - GPR_ASSERT(stream_global->max_recv_bytes >= - stream_parsing->incoming_window_delta); - stream_global->max_recv_bytes -= stream_parsing->incoming_window_delta; - stream_parsing->incoming_window_delta = 0; - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + if (stream_parsing->seen_error) { + stream_global->seen_error = 1; + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } /* update outgoing flow control window */ - if (stream_parsing->outgoing_window_update) { - int was_zero = stream_global->outgoing_window <= 0; - int is_zero; - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("parsed", transport_parsing, - stream_global, outgoing_window, - stream_parsing->outgoing_window_update); - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( - "parsed", transport_parsing, stream_parsing, outgoing_window_update, - -(gpr_int64)stream_parsing->outgoing_window_update); - GPR_ASSERT(stream_parsing->outgoing_window_update <= GPR_UINT32_MAX); - stream_global->outgoing_window += - (gpr_uint32)stream_parsing->outgoing_window_update; - stream_parsing->outgoing_window_update = 0; - is_zero = stream_global->outgoing_window <= 0; - if (was_zero && !is_zero) { - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); - } + was_zero = stream_global->outgoing_window <= 0; + GRPC_CHTTP2_FLOW_MOVE_STREAM("parsed", transport_global, stream_global, + outgoing_window, stream_parsing, + outgoing_window); + is_zero = stream_global->outgoing_window <= 0; + if (was_zero && !is_zero) { + grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } - /* updating closed status */ - if (stream_parsing->received_close) { - stream_global->read_closed = 1; - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); + stream_global->max_recv_bytes -= (gpr_uint32)GPR_MIN( + stream_global->max_recv_bytes, stream_parsing->received_bytes); + stream_parsing->received_bytes = 0; + + /* publish incoming stream ops */ + if (stream_global->incoming_frames.tail != NULL) { + stream_global->incoming_frames.tail->is_tail = 0; + } + if (stream_parsing->data_parser.incoming_frames.head != NULL) { + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + } + grpc_chttp2_incoming_frame_queue_merge( + &stream_global->incoming_frames, + &stream_parsing->data_parser.incoming_frames); + if (stream_global->incoming_frames.tail != NULL) { + stream_global->incoming_frames.tail->is_tail = 1; } + + if (!stream_global->published_initial_metadata && + stream_parsing->got_metadata_on_parse[0]) { + stream_parsing->got_metadata_on_parse[0] = 0; + stream_global->published_initial_metadata = 1; + GPR_SWAP(grpc_chttp2_incoming_metadata_buffer, + stream_parsing->metadata_buffer[0], + stream_global->received_initial_metadata); + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + } + if (!stream_global->published_trailing_metadata && + stream_parsing->got_metadata_on_parse[1]) { + stream_parsing->got_metadata_on_parse[1] = 0; + stream_global->published_trailing_metadata = 1; + GPR_SWAP(grpc_chttp2_incoming_metadata_buffer, + stream_parsing->metadata_buffer[1], + stream_global->received_trailing_metadata); + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + } + if (stream_parsing->saw_rst_stream) { - stream_global->cancelled = 1; - stream_global->cancelled_status = grpc_chttp2_http2_error_to_grpc_status( - (grpc_chttp2_error_code)stream_parsing->rst_stream_reason); - if (stream_parsing->rst_stream_reason == GRPC_CHTTP2_NO_ERROR) { - stream_global->published_cancelled = 1; + if (stream_parsing->rst_stream_reason != GRPC_CHTTP2_NO_ERROR) { + grpc_status_code status_code = grpc_chttp2_http2_error_to_grpc_status( + (grpc_chttp2_error_code)stream_parsing->rst_stream_reason); + char *status_details; + gpr_slice slice_details; + gpr_asprintf(&status_details, "Received RST_STREAM err=%d", + stream_parsing->rst_stream_reason); + slice_details = gpr_slice_from_copied_string(status_details); + gpr_free(status_details); + grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, + status_code, &slice_details); } - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); + grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, + 1, 1); } - /* publish incoming stream ops */ - if (stream_parsing->data_parser.incoming_sopb.nops > 0) { - grpc_incoming_metadata_buffer_move_to_referencing_sopb( - &stream_parsing->incoming_metadata, &stream_global->incoming_metadata, - &stream_parsing->data_parser.incoming_sopb); - grpc_sopb_move_to(&stream_parsing->data_parser.incoming_sopb, - &stream_global->incoming_sopb); - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); + if (stream_parsing->received_close) { + grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, + 1, 0); } } } @@ -363,7 +363,7 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx, GPR_ASSERT(cur < end); transport_parsing->incoming_stream_id |= ((gpr_uint32)*cur); transport_parsing->deframe_state = GRPC_DTS_FRAME; - if (!init_frame_parser(transport_parsing)) { + if (!init_frame_parser(exec_ctx, transport_parsing)) { return 0; } if (transport_parsing->incoming_stream_id) { @@ -428,7 +428,8 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx, GPR_UNREACHABLE_CODE(return 0); } -static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) { +static int init_frame_parser(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_parsing *transport_parsing) { if (transport_parsing->expect_continuation_stream_id != 0) { if (transport_parsing->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) { @@ -445,30 +446,30 @@ static int init_frame_parser(grpc_chttp2_transport_parsing *transport_parsing) { transport_parsing->incoming_stream_id); return 0; } - return init_header_frame_parser(transport_parsing, 1); + return init_header_frame_parser(exec_ctx, transport_parsing, 1); } switch (transport_parsing->incoming_frame_type) { case GRPC_CHTTP2_FRAME_DATA: - return init_data_frame_parser(transport_parsing); + return init_data_frame_parser(exec_ctx, transport_parsing); case GRPC_CHTTP2_FRAME_HEADER: - return init_header_frame_parser(transport_parsing, 0); + return init_header_frame_parser(exec_ctx, transport_parsing, 0); case GRPC_CHTTP2_FRAME_CONTINUATION: gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame"); return 0; case GRPC_CHTTP2_FRAME_RST_STREAM: - return init_rst_stream_parser(transport_parsing); + return init_rst_stream_parser(exec_ctx, transport_parsing); case GRPC_CHTTP2_FRAME_SETTINGS: - return init_settings_frame_parser(transport_parsing); + return init_settings_frame_parser(exec_ctx, transport_parsing); case GRPC_CHTTP2_FRAME_WINDOW_UPDATE: - return init_window_update_frame_parser(transport_parsing); + return init_window_update_frame_parser(exec_ctx, transport_parsing); case GRPC_CHTTP2_FRAME_PING: - return init_ping_parser(transport_parsing); + return init_ping_parser(exec_ctx, transport_parsing); case GRPC_CHTTP2_FRAME_GOAWAY: - return init_goaway_parser(transport_parsing); + return init_goaway_parser(exec_ctx, transport_parsing); default: gpr_log(GPR_ERROR, "Unknown frame type %02x", transport_parsing->incoming_frame_type); - return init_skip_frame_parser(transport_parsing, 0); + return init_skip_frame_parser(exec_ctx, transport_parsing, 0); } } @@ -482,7 +483,8 @@ static grpc_chttp2_parse_error skip_parser( static void skip_header(void *tp, grpc_mdelem *md) { GRPC_MDELEM_UNREF(md); } static int init_skip_frame_parser( - grpc_chttp2_transport_parsing *transport_parsing, int is_header) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, + int is_header) { if (is_header) { gpr_uint8 is_eoh = transport_parsing->expect_continuation_stream_id != 0; transport_parsing->parser = grpc_chttp2_header_parser_parse; @@ -499,65 +501,51 @@ static int init_skip_frame_parser( } void grpc_chttp2_parsing_become_skip_parser( - grpc_chttp2_transport_parsing *transport_parsing) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) { init_skip_frame_parser( - transport_parsing, + exec_ctx, transport_parsing, transport_parsing->parser == grpc_chttp2_header_parser_parse); } static grpc_chttp2_parse_error update_incoming_window( - grpc_chttp2_transport_parsing *transport_parsing, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) { - if (transport_parsing->incoming_frame_size > - transport_parsing->incoming_window) { + gpr_uint32 incoming_frame_size = transport_parsing->incoming_frame_size; + if (incoming_frame_size > transport_parsing->incoming_window) { gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d", transport_parsing->incoming_frame_size, transport_parsing->incoming_window); return GRPC_CHTTP2_CONNECTION_ERROR; } - if (transport_parsing->incoming_frame_size > - stream_parsing->incoming_window) { + if (incoming_frame_size > stream_parsing->incoming_window) { gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d", transport_parsing->incoming_frame_size, stream_parsing->incoming_window); return GRPC_CHTTP2_CONNECTION_ERROR; } - GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( - "data", transport_parsing, incoming_window, - -(gpr_int64)transport_parsing->incoming_frame_size); - GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT("data", transport_parsing, - incoming_window_delta, - transport_parsing->incoming_frame_size); - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( - "data", transport_parsing, stream_parsing, incoming_window, - -(gpr_int64)transport_parsing->incoming_frame_size); - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("data", transport_parsing, stream_parsing, - incoming_window_delta, - transport_parsing->incoming_frame_size); - - transport_parsing->incoming_window -= transport_parsing->incoming_frame_size; - transport_parsing->incoming_window_delta += - transport_parsing->incoming_frame_size; - stream_parsing->incoming_window -= transport_parsing->incoming_frame_size; - stream_parsing->incoming_window_delta += - transport_parsing->incoming_frame_size; + GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", transport_parsing, incoming_window, + incoming_frame_size); + GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", transport_parsing, stream_parsing, + incoming_window, incoming_frame_size); + stream_parsing->received_bytes += incoming_frame_size; + grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); return GRPC_CHTTP2_PARSE_OK; } static int init_data_frame_parser( - grpc_chttp2_transport_parsing *transport_parsing) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) { grpc_chttp2_stream_parsing *stream_parsing = grpc_chttp2_parsing_lookup_stream(transport_parsing, transport_parsing->incoming_stream_id); grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK; if (!stream_parsing || stream_parsing->received_close) - return init_skip_frame_parser(transport_parsing, 0); + return init_skip_frame_parser(exec_ctx, transport_parsing, 0); if (err == GRPC_CHTTP2_PARSE_OK) { - err = update_incoming_window(transport_parsing, stream_parsing); + err = update_incoming_window(exec_ctx, transport_parsing, stream_parsing); } if (err == GRPC_CHTTP2_PARSE_OK) { err = grpc_chttp2_data_parser_begin_frame( @@ -577,7 +565,7 @@ static int init_data_frame_parser( &transport_parsing->qbuf, grpc_chttp2_rst_stream_create(transport_parsing->incoming_stream_id, GRPC_CHTTP2_PROTOCOL_ERROR)); - return init_skip_frame_parser(transport_parsing, 0); + return init_skip_frame_parser(exec_ctx, transport_parsing, 0); case GRPC_CHTTP2_CONNECTION_ERROR: return 0; } @@ -586,11 +574,13 @@ static int init_data_frame_parser( static void free_timeout(void *p) { gpr_free(p); } -static void on_header(void *tp, grpc_mdelem *md) { +static void on_initial_header(void *tp, grpc_mdelem *md) { grpc_chttp2_transport_parsing *transport_parsing = tp; grpc_chttp2_stream_parsing *stream_parsing = transport_parsing->incoming_stream; + GPR_TIMER_BEGIN("on_initial_header", 0); + GPR_ASSERT(stream_parsing); GRPC_CHTTP2_IF_TRACING(gpr_log( @@ -598,6 +588,12 @@ static void on_header(void *tp, grpc_mdelem *md) { transport_parsing->is_client ? "CLI" : "SVR", grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value))); + if (md->key == transport_parsing->elem_grpc_status_ok->key && + md != transport_parsing->elem_grpc_status_ok) { + /* TODO(ctiller): check for a status like " 0" */ + stream_parsing->seen_error = 1; + } + if (md->key == transport_parsing->str_grpc_timeout) { gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout); if (!cached_timeout) { @@ -612,24 +608,57 @@ static void on_header(void *tp, grpc_mdelem *md) { grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); } grpc_chttp2_incoming_metadata_buffer_set_deadline( - &stream_parsing->incoming_metadata, + &stream_parsing->metadata_buffer[0], gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), *cached_timeout)); GRPC_MDELEM_UNREF(md); } else { - grpc_chttp2_incoming_metadata_buffer_add(&stream_parsing->incoming_metadata, - md); + grpc_chttp2_incoming_metadata_buffer_add( + &stream_parsing->metadata_buffer[0], md); } grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); + + GPR_TIMER_END("on_initial_header", 0); +} + +static void on_trailing_header(void *tp, grpc_mdelem *md) { + grpc_chttp2_transport_parsing *transport_parsing = tp; + grpc_chttp2_stream_parsing *stream_parsing = + transport_parsing->incoming_stream; + + GPR_TIMER_BEGIN("on_trailing_header", 0); + + GPR_ASSERT(stream_parsing); + + GRPC_CHTTP2_IF_TRACING(gpr_log( + GPR_INFO, "HTTP:%d:TRL:%s: %s: %s", stream_parsing->id, + transport_parsing->is_client ? "CLI" : "SVR", + grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value))); + + if (md->key == transport_parsing->elem_grpc_status_ok->key && + md != transport_parsing->elem_grpc_status_ok) { + /* TODO(ctiller): check for a status like " 0" */ + stream_parsing->seen_error = 1; + } + + grpc_chttp2_incoming_metadata_buffer_add(&stream_parsing->metadata_buffer[1], + md); + + grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing); + + GPR_TIMER_END("on_trailing_header", 0); } static int init_header_frame_parser( - grpc_chttp2_transport_parsing *transport_parsing, int is_continuation) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, + int is_continuation) { gpr_uint8 is_eoh = (transport_parsing->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0; int via_accept = 0; grpc_chttp2_stream_parsing *stream_parsing; + /* TODO(ctiller): when to increment header_frames_received? */ + if (is_eoh) { transport_parsing->expect_continuation_stream_id = 0; } else { @@ -649,7 +678,7 @@ static int init_header_frame_parser( if (is_continuation) { gpr_log(GPR_ERROR, "grpc_chttp2_stream disbanded before CONTINUATION received"); - return init_skip_frame_parser(transport_parsing, 1); + return init_skip_frame_parser(exec_ctx, transport_parsing, 1); } if (transport_parsing->is_client) { if ((transport_parsing->incoming_stream_id & 1) && @@ -660,7 +689,7 @@ static int init_header_frame_parser( gpr_log(GPR_ERROR, "ignoring new grpc_chttp2_stream creation on client"); } - return init_skip_frame_parser(transport_parsing, 1); + return init_skip_frame_parser(exec_ctx, transport_parsing, 1); } else if (transport_parsing->last_incoming_stream_id > transport_parsing->incoming_stream_id) { gpr_log(GPR_ERROR, @@ -669,19 +698,19 @@ static int init_header_frame_parser( "id=%d, new grpc_chttp2_stream id=%d", transport_parsing->last_incoming_stream_id, transport_parsing->incoming_stream_id); - return init_skip_frame_parser(transport_parsing, 1); + return init_skip_frame_parser(exec_ctx, transport_parsing, 1); } else if ((transport_parsing->incoming_stream_id & 1) == 0) { gpr_log(GPR_ERROR, "ignoring grpc_chttp2_stream with non-client generated index %d", transport_parsing->incoming_stream_id); - return init_skip_frame_parser(transport_parsing, 1); + return init_skip_frame_parser(exec_ctx, transport_parsing, 1); } stream_parsing = transport_parsing->incoming_stream = grpc_chttp2_parsing_accept_stream( - transport_parsing, transport_parsing->incoming_stream_id); + exec_ctx, transport_parsing, transport_parsing->incoming_stream_id); if (stream_parsing == NULL) { gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted"); - return init_skip_frame_parser(transport_parsing, 1); + return init_skip_frame_parser(exec_ctx, transport_parsing, 1); } via_accept = 1; } else { @@ -691,11 +720,21 @@ static int init_header_frame_parser( if (stream_parsing->received_close) { gpr_log(GPR_ERROR, "skipping already closed grpc_chttp2_stream header"); transport_parsing->incoming_stream = NULL; - return init_skip_frame_parser(transport_parsing, 1); + return init_skip_frame_parser(exec_ctx, transport_parsing, 1); } transport_parsing->parser = grpc_chttp2_header_parser_parse; transport_parsing->parser_data = &transport_parsing->hpack_parser; - transport_parsing->hpack_parser.on_header = on_header; + switch (stream_parsing->header_frames_received) { + case 0: + transport_parsing->hpack_parser.on_header = on_initial_header; + break; + case 1: + transport_parsing->hpack_parser.on_header = on_trailing_header; + break; + case 2: + gpr_log(GPR_ERROR, "too many header frames received"); + return init_skip_frame_parser(exec_ctx, transport_parsing, 1); + } transport_parsing->hpack_parser.on_header_user_data = transport_parsing; transport_parsing->hpack_parser.is_boundary = is_eoh; transport_parsing->hpack_parser.is_eof = @@ -708,7 +747,7 @@ static int init_header_frame_parser( } static int init_window_update_frame_parser( - grpc_chttp2_transport_parsing *transport_parsing) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) { int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame( &transport_parsing->simple.window_update, transport_parsing->incoming_frame_size, @@ -722,7 +761,8 @@ static int init_window_update_frame_parser( return ok; } -static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing) { +static int init_ping_parser(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_parsing *transport_parsing) { int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_ping_parser_begin_frame( &transport_parsing->simple.ping, transport_parsing->incoming_frame_size, @@ -733,7 +773,7 @@ static int init_ping_parser(grpc_chttp2_transport_parsing *transport_parsing) { } static int init_rst_stream_parser( - grpc_chttp2_transport_parsing *transport_parsing) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) { int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_rst_stream_parser_begin_frame( &transport_parsing->simple.rst_stream, transport_parsing->incoming_frame_size, @@ -741,7 +781,7 @@ static int init_rst_stream_parser( transport_parsing->incoming_stream = grpc_chttp2_parsing_lookup_stream( transport_parsing, transport_parsing->incoming_stream_id); if (!transport_parsing->incoming_stream) { - return init_skip_frame_parser(transport_parsing, 0); + return init_skip_frame_parser(exec_ctx, transport_parsing, 0); } transport_parsing->parser = grpc_chttp2_rst_stream_parser_parse; transport_parsing->parser_data = &transport_parsing->simple.rst_stream; @@ -749,7 +789,7 @@ static int init_rst_stream_parser( } static int init_goaway_parser( - grpc_chttp2_transport_parsing *transport_parsing) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) { int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_goaway_parser_begin_frame( &transport_parsing->goaway_parser, transport_parsing->incoming_frame_size, @@ -760,7 +800,7 @@ static int init_goaway_parser( } static int init_settings_frame_parser( - grpc_chttp2_transport_parsing *transport_parsing) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) { int ok; if (transport_parsing->incoming_stream_id != 0) { @@ -806,7 +846,7 @@ static int parse_frame_slice(grpc_exec_ctx *exec_ctx, } return 1; case GRPC_CHTTP2_STREAM_ERROR: - grpc_chttp2_parsing_become_skip_parser(transport_parsing); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, transport_parsing); if (stream_parsing) { stream_parsing->saw_rst_stream = 1; stream_parsing->rst_stream_reason = GRPC_CHTTP2_PROTOCOL_ERROR; diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 781db7b0d6..a81ffcce79 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -142,12 +142,13 @@ static void stream_list_add_tail(grpc_chttp2_transport *t, s->included[id] = 1; } -static void stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s, - grpc_chttp2_stream_list_id id) { +static int stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s, + grpc_chttp2_stream_list_id id) { if (s->included[id]) { - return; + return 0; } stream_list_add_tail(t, s, id); + return 1; } /* wrappers for specializations */ @@ -192,12 +193,12 @@ void grpc_chttp2_list_remove_writable_stream( GRPC_CHTTP2_LIST_WRITABLE); } -void grpc_chttp2_list_add_writing_stream( +int grpc_chttp2_list_add_writing_stream( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { - stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), - STREAM_FROM_WRITING(stream_writing), - GRPC_CHTTP2_LIST_WRITING); + return stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), + STREAM_FROM_WRITING(stream_writing), + GRPC_CHTTP2_LIST_WRITING); } int grpc_chttp2_list_have_writing_streams( @@ -241,6 +242,40 @@ int grpc_chttp2_list_pop_written_stream( return r; } +void grpc_chttp2_list_add_unannounced_incoming_window_available( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global) { + GPR_ASSERT(stream_global->id != 0); + stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE); +} + +void grpc_chttp2_list_remove_unannounced_incoming_window_available( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global) { + stream_list_maybe_remove( + TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE); +} + +int grpc_chttp2_list_pop_unannounced_incoming_window_available( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_transport_parsing *transport_parsing, + grpc_chttp2_stream_global **stream_global, + grpc_chttp2_stream_parsing **stream_parsing) { + grpc_chttp2_stream *stream; + int r = + stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, + GRPC_CHTTP2_LIST_UNANNOUNCED_INCOMING_WINDOW_AVAILABLE); + if (r != 0) { + *stream_global = &stream->global; + *stream_parsing = &stream->parsing; + } + return r; +} + void grpc_chttp2_list_add_parsing_seen_stream( grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) { @@ -284,91 +319,60 @@ int grpc_chttp2_list_pop_waiting_for_concurrency( return r; } -void grpc_chttp2_list_add_closed_waiting_for_parsing( +void grpc_chttp2_list_add_check_read_ops( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), - GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING); + GRPC_CHTTP2_LIST_CHECK_READ_OPS); } -int grpc_chttp2_list_pop_closed_waiting_for_parsing( +int grpc_chttp2_list_pop_check_read_ops( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global) { grpc_chttp2_stream *stream; int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, - GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING); + GRPC_CHTTP2_LIST_CHECK_READ_OPS); if (r != 0) { *stream_global = &stream->global; } return r; } -void grpc_chttp2_list_add_cancelled_waiting_for_writing( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global) { - stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), - STREAM_FROM_GLOBAL(stream_global), - GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING); +void grpc_chttp2_list_add_stalled_by_transport( + grpc_chttp2_transport_writing *transport_writing, + grpc_chttp2_stream_writing *stream_writing) { + stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), + STREAM_FROM_WRITING(stream_writing), + GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); } -int grpc_chttp2_list_pop_cancelled_waiting_for_writing( +int grpc_chttp2_list_pop_stalled_by_transport( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global) { grpc_chttp2_stream *stream; int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, - GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING); + GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); if (r != 0) { *stream_global = &stream->global; } return r; } -void grpc_chttp2_list_add_incoming_window_updated( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global) { - stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), - STREAM_FROM_GLOBAL(stream_global), - GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED); -} - -int grpc_chttp2_list_pop_incoming_window_updated( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_transport_parsing *transport_parsing, - grpc_chttp2_stream_global **stream_global, - grpc_chttp2_stream_parsing **stream_parsing) { - grpc_chttp2_stream *stream; - int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, - GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED); - if (r != 0) { - *stream_global = &stream->global; - *stream_parsing = &stream->parsing; - } - return r; -} - -void grpc_chttp2_list_remove_incoming_window_updated( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global) { - stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), - STREAM_FROM_GLOBAL(stream_global), - GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED); -} - -void grpc_chttp2_list_add_read_write_state_changed( +void grpc_chttp2_list_add_closed_waiting_for_parsing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), - GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED); + GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING); } -int grpc_chttp2_list_pop_read_write_state_changed( +int grpc_chttp2_list_pop_closed_waiting_for_parsing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global) { grpc_chttp2_stream *stream; int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, - GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED); + GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING); if (r != 0) { *stream_global = &stream->global; } diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index 69ad8854ba..353e476e40 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -40,15 +40,16 @@ #include "src/core/profiling/timers.h" #include "src/core/transport/chttp2/http2_errors.h" -static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing); +static void finalize_outbuf(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_writing *transport_writing); int grpc_chttp2_unlocking_check_writes( grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) { grpc_chttp2_stream_global *stream_global; grpc_chttp2_stream_writing *stream_writing; - grpc_chttp2_stream_global *first_reinserted_stream = NULL; - gpr_uint32 window_delta; + + GPR_TIMER_BEGIN("grpc_chttp2_unlocking_check_writes", 0); /* simple writes are queued to qbuf, and flushed here */ gpr_slice_buffer_swap(&transport_global->qbuf, &transport_writing->outbuf); @@ -67,98 +68,103 @@ int grpc_chttp2_unlocking_check_writes( transport_global->sent_local_settings = 1; } + GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window, + transport_global, outgoing_window); + /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ while (grpc_chttp2_list_pop_writable_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { - if (stream_global == first_reinserted_stream) { - /* prevent infinite loop */ - grpc_chttp2_list_add_first_writable_stream(transport_global, - stream_global); - break; - } + gpr_uint8 sent_initial_metadata; stream_writing->id = stream_global->id; - stream_writing->send_closed = GRPC_DONT_SEND_CLOSED; - - if (stream_global->outgoing_sopb) { - window_delta = grpc_chttp2_preencode( - stream_global->outgoing_sopb->ops, - &stream_global->outgoing_sopb->nops, - (gpr_uint32)GPR_MIN(GPR_MIN(transport_global->outgoing_window, - stream_global->outgoing_window), - GPR_UINT32_MAX), - &stream_writing->sopb); - GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( - "write", transport_global, outgoing_window, -(gpr_int64)window_delta); - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, - outgoing_window, - -(gpr_int64)window_delta); - transport_global->outgoing_window -= window_delta; - stream_global->outgoing_window -= window_delta; - - if (stream_global->write_state == GRPC_WRITE_STATE_QUEUED_CLOSE && - stream_global->outgoing_sopb->nops == 0) { - if (!transport_global->is_client && !stream_global->read_closed) { - stream_writing->send_closed = GRPC_SEND_CLOSED_WITH_RST_STREAM; + stream_writing->read_closed = stream_global->read_closed; + + GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_writing, stream_writing, + outgoing_window, stream_global, + outgoing_window); + + sent_initial_metadata = stream_writing->sent_initial_metadata; + if (!sent_initial_metadata && stream_global->send_initial_metadata) { + stream_writing->send_initial_metadata = + stream_global->send_initial_metadata; + stream_global->send_initial_metadata = NULL; + if (grpc_chttp2_list_add_writing_stream(transport_writing, + stream_writing)) { + GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); + } + sent_initial_metadata = 1; + } + if (sent_initial_metadata) { + if (stream_global->send_message != NULL) { + gpr_slice hdr = gpr_slice_malloc(5); + gpr_uint8 *p = GPR_SLICE_START_PTR(hdr); + gpr_uint32 len = stream_global->send_message->length; + GPR_ASSERT(stream_writing->send_message == NULL); + p[0] = (stream_global->send_message->flags & + GRPC_WRITE_INTERNAL_COMPRESS) != 0; + p[1] = (gpr_uint8)(len >> 24); + p[2] = (gpr_uint8)(len >> 16); + p[3] = (gpr_uint8)(len >> 8); + p[4] = (gpr_uint8)(len); + gpr_slice_buffer_add(&stream_writing->flow_controlled_buffer, hdr); + if (stream_global->send_message->length > 0) { + stream_writing->send_message = stream_global->send_message; } else { - stream_writing->send_closed = GRPC_SEND_CLOSED; + stream_writing->send_message = NULL; } + stream_writing->stream_fetched = 0; + stream_global->send_message = NULL; } - - if (stream_global->outgoing_window > 0 && - stream_global->outgoing_sopb->nops != 0) { - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); - if (first_reinserted_stream == NULL && - transport_global->outgoing_window == 0) { - first_reinserted_stream = stream_global; + if ((stream_writing->send_message != NULL || + stream_writing->flow_controlled_buffer.length > 0) && + stream_writing->outgoing_window > 0) { + if (transport_writing->outgoing_window > 0) { + if (grpc_chttp2_list_add_writing_stream(transport_writing, + stream_writing)) { + GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); + } + } else { + grpc_chttp2_list_add_stalled_by_transport(transport_writing, + stream_writing); + } + } + if (stream_global->send_trailing_metadata) { + stream_writing->send_trailing_metadata = + stream_global->send_trailing_metadata; + stream_global->send_trailing_metadata = NULL; + if (grpc_chttp2_list_add_writing_stream(transport_writing, + stream_writing)) { + GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); } } } if (!stream_global->read_closed && - stream_global->unannounced_incoming_window > 0) { - GPR_ASSERT(stream_writing->announce_window == 0); - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( - "write", transport_writing, stream_writing, announce_window, - stream_global->unannounced_incoming_window); - stream_writing->announce_window = - stream_global->unannounced_incoming_window; - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( - "write", transport_global, stream_global, incoming_window, - stream_global->unannounced_incoming_window); - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( - "write", transport_global, stream_global, unannounced_incoming_window, - -(gpr_int64)stream_global->unannounced_incoming_window); - stream_global->incoming_window += - stream_global->unannounced_incoming_window; - stream_global->unannounced_incoming_window = 0; - grpc_chttp2_list_add_incoming_window_updated(transport_global, - stream_global); - stream_global->writing_now |= GRPC_CHTTP2_WRITING_WINDOW; - } - if (stream_writing->sopb.nops > 0 || - stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { - stream_global->writing_now |= GRPC_CHTTP2_WRITING_DATA; - } - if (stream_global->writing_now != 0) { - grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); + stream_global->unannounced_incoming_window_for_writing > 1024) { + GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing, + announce_window, stream_global, + unannounced_incoming_window_for_writing); + if (grpc_chttp2_list_add_writing_stream(transport_writing, + stream_writing)) { + GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); + } } } /* if the grpc_chttp2_transport is ready to send a window update, do so here also; 3/4 is a magic number that will likely get tuned soon */ - if (transport_global->incoming_window < - transport_global->connection_window_target * 3 / 4) { - window_delta = transport_global->connection_window_target - - transport_global->incoming_window; + if (transport_global->announce_incoming_window > 0) { + gpr_uint32 announced = (gpr_uint32)GPR_MIN( + transport_global->announce_incoming_window, GPR_UINT32_MAX); + GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_global, + announce_incoming_window, announced); gpr_slice_buffer_add(&transport_writing->outbuf, - grpc_chttp2_window_update_create(0, window_delta)); - GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT("write", transport_global, - incoming_window, window_delta); - transport_global->incoming_window += window_delta; + grpc_chttp2_window_update_create(0, announced)); } + GPR_TIMER_END("grpc_chttp2_unlocking_check_writes", 0); + return transport_writing->outbuf.count > 0 || grpc_chttp2_list_have_writing_streams(transport_writing); } @@ -169,50 +175,146 @@ void grpc_chttp2_perform_writes( GPR_ASSERT(transport_writing->outbuf.count > 0 || grpc_chttp2_list_have_writing_streams(transport_writing)); - finalize_outbuf(transport_writing); + finalize_outbuf(exec_ctx, transport_writing); - GPR_ASSERT(transport_writing->outbuf.count > 0); GPR_ASSERT(endpoint); - grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf, - &transport_writing->done_cb); + if (transport_writing->outbuf.count > 0) { + grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf, + &transport_writing->done_cb); + } else { + grpc_exec_ctx_enqueue(exec_ctx, &transport_writing->done_cb, 1); + } } -static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { +static void finalize_outbuf(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_writing *transport_writing) { grpc_chttp2_stream_writing *stream_writing; GPR_TIMER_BEGIN("finalize_outbuf", 0); while ( grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) { - if (stream_writing->sopb.nops > 0 || - stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { - grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops, - stream_writing->send_closed != GRPC_DONT_SEND_CLOSED, - stream_writing->id, - &transport_writing->hpack_compressor, - &transport_writing->outbuf); - stream_writing->sopb.nops = 0; + gpr_uint32 max_outgoing = + (gpr_uint32)GPR_MIN(GRPC_CHTTP2_MAX_PAYLOAD_LENGTH, + GPR_MIN(stream_writing->outgoing_window, + transport_writing->outgoing_window)); + /* send initial metadata if it's available */ + if (stream_writing->send_initial_metadata != NULL) { + grpc_chttp2_encode_header( + &transport_writing->hpack_compressor, stream_writing->id, + stream_writing->send_initial_metadata, 0, &transport_writing->outbuf); + stream_writing->send_initial_metadata = NULL; + stream_writing->sent_initial_metadata = 1; } - if (stream_writing->announce_window > 0) { + /* send any window updates */ + if (stream_writing->announce_window > 0 && + stream_writing->send_initial_metadata == NULL) { + gpr_uint32 announce = stream_writing->announce_window; gpr_slice_buffer_add( &transport_writing->outbuf, grpc_chttp2_window_update_create(stream_writing->id, stream_writing->announce_window)); - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( - "write", transport_writing, stream_writing, announce_window, - -(gpr_int64)stream_writing->announce_window); + GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing, stream_writing, + announce_window, announce); stream_writing->announce_window = 0; } - if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) { - gpr_slice_buffer_add(&transport_writing->outbuf, - grpc_chttp2_rst_stream_create(stream_writing->id, - GRPC_CHTTP2_NO_ERROR)); + /* fetch any body bytes */ + while (!stream_writing->fetching && stream_writing->send_message && + stream_writing->flow_controlled_buffer.length < max_outgoing && + stream_writing->stream_fetched < + stream_writing->send_message->length) { + if (grpc_byte_stream_next(exec_ctx, stream_writing->send_message, + &stream_writing->fetching_slice, max_outgoing, + &stream_writing->finished_fetch)) { + stream_writing->stream_fetched += + GPR_SLICE_LENGTH(stream_writing->fetching_slice); + if (stream_writing->stream_fetched == + stream_writing->send_message->length) { + stream_writing->send_message = NULL; + } + gpr_slice_buffer_add(&stream_writing->flow_controlled_buffer, + stream_writing->fetching_slice); + } else { + stream_writing->fetching = 1; + } + } + /* send any body bytes */ + if (stream_writing->flow_controlled_buffer.length > 0) { + if (max_outgoing > 0) { + gpr_uint32 send_bytes = (gpr_uint32)GPR_MIN( + max_outgoing, stream_writing->flow_controlled_buffer.length); + int is_last_data_frame = + stream_writing->send_message == NULL && + send_bytes == stream_writing->flow_controlled_buffer.length; + int is_last_frame = is_last_data_frame && + stream_writing->send_trailing_metadata != NULL && + grpc_metadata_batch_is_empty( + stream_writing->send_trailing_metadata); + grpc_chttp2_encode_data( + stream_writing->id, &stream_writing->flow_controlled_buffer, + send_bytes, is_last_frame, &transport_writing->outbuf); + GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing, + stream_writing, outgoing_window, + send_bytes); + GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_writing, + outgoing_window, send_bytes); + if (is_last_frame) { + stream_writing->send_trailing_metadata = NULL; + stream_writing->sent_trailing_metadata = 1; + } + if (is_last_data_frame) { + GPR_ASSERT(stream_writing->send_message == NULL); + stream_writing->sent_message = 1; + } + } else if (transport_writing->outgoing_window == 0) { + grpc_chttp2_list_add_stalled_by_transport(transport_writing, + stream_writing); + grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); + } + } + /* send trailing metadata if it's available and we're ready for it */ + if (stream_writing->send_message == NULL && + stream_writing->flow_controlled_buffer.length == 0 && + stream_writing->send_trailing_metadata != NULL) { + if (grpc_metadata_batch_is_empty( + stream_writing->send_trailing_metadata)) { + grpc_chttp2_encode_data(stream_writing->id, + &stream_writing->flow_controlled_buffer, 0, 1, + &transport_writing->outbuf); + } else { + grpc_chttp2_encode_header(&transport_writing->hpack_compressor, + stream_writing->id, + stream_writing->send_trailing_metadata, 1, + &transport_writing->outbuf); + } + if (!transport_writing->is_client && !stream_writing->read_closed) { + gpr_slice_buffer_add(&transport_writing->outbuf, + grpc_chttp2_rst_stream_create( + stream_writing->id, GRPC_CHTTP2_NO_ERROR)); + } + stream_writing->send_trailing_metadata = NULL; + stream_writing->sent_trailing_metadata = 1; + } + /* if there's more to write, then loop, otherwise prepare to finish the + * write */ + if ((stream_writing->flow_controlled_buffer.length > 0 || + (stream_writing->send_message && !stream_writing->fetching)) && + stream_writing->outgoing_window > 0) { + if (transport_writing->outgoing_window > 0) { + if (grpc_chttp2_list_add_writing_stream(transport_writing, + stream_writing)) { + /* do nothing - already reffed */ + } + } else { + grpc_chttp2_list_add_stalled_by_transport(transport_writing, + stream_writing); + grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); + } + } else { + grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); } - grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); } - - GPR_TIMER_END("finalize_outbuf", 0); } void grpc_chttp2_cleanup_writing( @@ -223,24 +325,26 @@ void grpc_chttp2_cleanup_writing( while (grpc_chttp2_list_pop_written_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { - GPR_ASSERT(stream_global->writing_now != 0); - if (stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { - stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE; - if (!transport_global->is_client) { - stream_global->read_closed = 1; - } + if (stream_writing->sent_trailing_metadata) { + grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, + !transport_global->is_client, 1); } - if (stream_global->writing_now & GRPC_CHTTP2_WRITING_DATA) { - if (stream_global->outgoing_sopb != NULL && - stream_global->outgoing_sopb->nops == 0) { - GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE); - stream_global->outgoing_sopb = NULL; - grpc_exec_ctx_enqueue(exec_ctx, stream_global->send_done_closure, 1); - } + if (stream_writing->sent_initial_metadata) { + grpc_chttp2_complete_closure_step( + exec_ctx, &stream_global->send_initial_metadata_finished, 1); + } + if (stream_writing->sent_message) { + GPR_ASSERT(stream_writing->send_message == NULL); + GPR_ASSERT(stream_global->send_message_finished); + grpc_chttp2_complete_closure_step( + exec_ctx, &stream_global->send_message_finished, 1); + stream_writing->sent_message = 0; + } + if (stream_writing->sent_trailing_metadata) { + grpc_chttp2_complete_closure_step( + exec_ctx, &stream_global->send_trailing_metadata_finished, 1); } - stream_global->writing_now = 0; - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing"); } gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf); } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index effc3c4b3b..10f52f4923 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -52,6 +52,7 @@ #include "src/core/transport/transport_impl.h" #define DEFAULT_WINDOW 65535 +#define GRPC_CHTTP2_STREAM_LOOKAHEAD DEFAULT_WINDOW #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024) #define MAX_WINDOW 0x7fffffffu @@ -75,14 +76,14 @@ int grpc_flowctl_trace = 0; #define STREAM_FROM_GLOBAL(sg) \ ((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, global))) +#define STREAM_FROM_PARSING(sg) \ + ((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, parsing))) + static const grpc_transport_vtable vtable; static void lock(grpc_chttp2_transport *t); static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); -static void unlock_check_read_write_state(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t); - /* forward declarations of various callbacks that we'll build closures around */ static void writing_action(grpc_exec_ctx *exec_ctx, void *t, int iomgr_success_ignored); @@ -103,11 +104,13 @@ static void perform_stream_op_locked( grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op); /** Cancel a stream: coming from the transport API */ -static void cancel_from_api(grpc_chttp2_transport_global *transport_global, +static void cancel_from_api(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_status_code status); -static void close_from_api(grpc_chttp2_transport_global *transport_global, +static void close_from_api(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_status_code status, gpr_slice *optional_message); @@ -128,6 +131,9 @@ static void connectivity_state_set( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_connectivity_state state, const char *reason); +static void check_read_ops(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global); + /* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -151,6 +157,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser); GRPC_MDSTR_UNREF(t->parsing.str_grpc_timeout); + GRPC_MDELEM_UNREF(t->parsing.elem_grpc_status_ok); for (i = 0; i < STREAM_LIST_COUNT; i++) { GPR_ASSERT(t->lists[i].head == NULL); @@ -236,14 +243,16 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->endpoint_reading = 1; t->global.next_stream_id = is_client ? 1 : 2; t->global.is_client = is_client; - t->global.outgoing_window = DEFAULT_WINDOW; - t->global.incoming_window = DEFAULT_WINDOW; + t->writing.outgoing_window = DEFAULT_WINDOW; + t->parsing.incoming_window = DEFAULT_WINDOW; t->global.connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET; t->global.ping_counter = 1; t->global.pings.next = t->global.pings.prev = &t->global.pings; t->parsing.is_client = is_client; t->parsing.str_grpc_timeout = grpc_mdstr_from_string(t->metadata_context, "grpc-timeout"); + t->parsing.elem_grpc_status_ok = + grpc_mdelem_from_strings(t->metadata_context, "grpc-status", "0"); t->parsing.deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->writing.is_client = is_client; @@ -392,19 +401,46 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, } } +#ifdef GRPC_STREAM_REFCOUNT_DEBUG +void grpc_chttp2_stream_ref(grpc_chttp2_stream_global *stream_global, + const char *reason) { + grpc_stream_ref(STREAM_FROM_GLOBAL(stream_global)->refcount, reason); +} +void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, + grpc_chttp2_stream_global *stream_global, + const char *reason) { + grpc_stream_unref(exec_ctx, STREAM_FROM_GLOBAL(stream_global)->refcount, + reason); +} +#else +void grpc_chttp2_stream_ref(grpc_chttp2_stream_global *stream_global) { + grpc_stream_ref(STREAM_FROM_GLOBAL(stream_global)->refcount); +} +void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, + grpc_chttp2_stream_global *stream_global) { + grpc_stream_unref(exec_ctx, STREAM_FROM_GLOBAL(stream_global)->refcount); +} +#endif + static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, const void *server_data, - grpc_transport_stream_op *initial_op) { + grpc_stream *gs, grpc_stream_refcount *refcount, + const void *server_data) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; memset(s, 0, sizeof(*s)); - grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.incoming_metadata); - grpc_chttp2_incoming_metadata_buffer_init(&s->global.incoming_metadata); - grpc_sopb_init(&s->writing.sopb); - grpc_sopb_init(&s->global.incoming_sopb); + s->refcount = refcount; + GRPC_CHTTP2_STREAM_REF(&s->global, "chttp2"); + + grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.metadata_buffer[0]); + grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.metadata_buffer[1]); + grpc_chttp2_incoming_metadata_buffer_init( + &s->global.received_initial_metadata); + grpc_chttp2_incoming_metadata_buffer_init( + &s->global.received_trailing_metadata); grpc_chttp2_data_parser_init(&s->parsing.data_parser); + gpr_slice_buffer_init(&s->writing.flow_controlled_buffer); REF_TRANSPORT(t, "stream"); @@ -413,20 +449,17 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, if (server_data) { GPR_ASSERT(t->parsing_active); s->global.id = (gpr_uint32)(gpr_uintptr)server_data; + s->parsing.id = s->global.id; s->global.outgoing_window = t->global.settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->global.max_recv_bytes = s->parsing.incoming_window = - s->global.incoming_window = - t->global.settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + s->parsing.incoming_window = s->global.max_recv_bytes = + t->global.settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; *t->accepting_stream = s; grpc_chttp2_stream_map_add(&t->parsing_stream_map, s->global.id, s); s->global.in_stream_map = 1; } - - if (initial_op) - perform_stream_op_locked(exec_ctx, &t->global, &s->global, initial_op); unlock(exec_ctx, t); return 0; @@ -437,10 +470,13 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; int i; + grpc_byte_stream *bs; + + GPR_TIMER_BEGIN("destroy_stream", 0); gpr_mu_lock(&t->mu); - GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED || + GPR_ASSERT((s->global.write_closed && s->global.read_closed) || s->global.id == 0); GPR_ASSERT(!s->global.in_stream_map); if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { @@ -451,8 +487,9 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->global.id) == NULL); } - grpc_chttp2_list_remove_incoming_window_updated(&t->global, &s->global); grpc_chttp2_list_remove_writable_stream(&t->global, &s->global); + grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global, + &s->global); gpr_mu_unlock(&t->mu); @@ -464,17 +501,29 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, } } - GPR_ASSERT(s->global.outgoing_sopb == NULL); - GPR_ASSERT(s->global.publish_sopb == NULL); - grpc_sopb_destroy(&s->writing.sopb); - grpc_sopb_destroy(&s->global.incoming_sopb); + while ( + (bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) { + grpc_byte_stream_destroy(bs); + } + + GPR_ASSERT(s->global.send_initial_metadata_finished == NULL); + GPR_ASSERT(s->global.send_message_finished == NULL); + GPR_ASSERT(s->global.send_trailing_metadata_finished == NULL); + GPR_ASSERT(s->global.recv_initial_metadata_finished == NULL); + GPR_ASSERT(s->global.recv_message_ready == NULL); + GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL); grpc_chttp2_data_parser_destroy(&s->parsing.data_parser); - grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.incoming_metadata); - grpc_chttp2_incoming_metadata_buffer_destroy(&s->global.incoming_metadata); - grpc_chttp2_incoming_metadata_live_op_buffer_end( - &s->global.outstanding_metadata); + grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[0]); + grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[1]); + grpc_chttp2_incoming_metadata_buffer_destroy( + &s->global.received_initial_metadata); + grpc_chttp2_incoming_metadata_buffer_destroy( + &s->global.received_trailing_metadata); + gpr_slice_buffer_destroy(&s->writing.flow_controlled_buffer); UNREF_TRANSPORT(exec_ctx, t, "stream"); + + GPR_TIMER_END("destroy_stream", 0); } grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( @@ -486,12 +535,14 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( } grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( - grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id) { + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, + gpr_uint32 id) { grpc_chttp2_stream *accepting; grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing); GPR_ASSERT(t->accepting_stream == NULL); t->accepting_stream = &accepting; - t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data, + t->channel_callback.accept_stream(exec_ctx, + t->channel_callback.accept_stream_user_data, &t->base, (void *)(gpr_uintptr)id); t->accepting_stream = NULL; return &accepting->parsing; @@ -511,7 +562,6 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); } static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { GPR_TIMER_BEGIN("unlock", 0); - unlock_check_read_write_state(exec_ctx, t); if (!t->writing_active && !t->closed && grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) { t->writing_active = 1; @@ -519,6 +569,7 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, 1); prevent_endpoint_shutdown(t); } + check_read_ops(exec_ctx, &t->global); gpr_mu_unlock(&t->mu); GPR_TIMER_END("unlock", 0); @@ -558,7 +609,6 @@ void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, drop_connection(exec_ctx, t); } - /* cleanup writing related jazz */ grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing); /* leave the writing flag up on shutdown to prevent further writes in unlock() @@ -598,6 +648,7 @@ void grpc_chttp2_add_incoming_goaway( static void maybe_start_some_streams( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) { grpc_chttp2_stream_global *stream_global; + gpr_uint32 stream_incoming_window; /* start streams where we have free grpc_chttp2_stream ids and free * concurrency */ while (transport_global->next_stream_id <= MAX_CLIENT_STREAM_ID && @@ -607,13 +658,16 @@ static void maybe_start_some_streams( [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] && grpc_chttp2_list_pop_waiting_for_concurrency(transport_global, &stream_global)) { + /* safe since we can't (legally) be parsing this stream yet */ + grpc_chttp2_stream_parsing *stream_parsing = + &STREAM_FROM_GLOBAL(stream_global)->parsing; GRPC_CHTTP2_IF_TRACING(gpr_log( GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d", transport_global->is_client ? "CLI" : "SVR", stream_global, transport_global->next_stream_id)); GPR_ASSERT(stream_global->id == 0); - stream_global->id = transport_global->next_stream_id; + stream_global->id = stream_parsing->id = transport_global->next_stream_id; transport_global->next_stream_id += 2; if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) { @@ -625,103 +679,166 @@ static void maybe_start_some_streams( stream_global->outgoing_window = transport_global->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - stream_global->incoming_window = + stream_parsing->incoming_window = stream_incoming_window = transport_global->settings[GRPC_SENT_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; stream_global->max_recv_bytes = - GPR_MAX(stream_global->incoming_window, stream_global->max_recv_bytes); + GPR_MAX(stream_incoming_window, stream_global->max_recv_bytes); grpc_chttp2_stream_map_add( &TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map, stream_global->id, STREAM_FROM_GLOBAL(stream_global)); stream_global->in_stream_map = 1; transport_global->concurrent_stream_count++; - grpc_chttp2_list_add_incoming_window_updated(transport_global, - stream_global); grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } /* cancel out streams that will never be started */ while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID && grpc_chttp2_list_pop_waiting_for_concurrency(transport_global, &stream_global)) { - cancel_from_api(transport_global, stream_global, GRPC_STATUS_UNAVAILABLE); + cancel_from_api(exec_ctx, transport_global, stream_global, + GRPC_STATUS_UNAVAILABLE); + } +} + +static grpc_closure *add_closure_barrier(grpc_closure *closure) { + closure->final_data += 2; + return closure; +} + +void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, + grpc_closure **pclosure, int success) { + grpc_closure *closure = *pclosure; + if (closure == NULL) { + return; + } + closure->final_data -= 2; + if (!success) { + closure->final_data |= 1; + } + if (closure->final_data < 2) { + grpc_exec_ctx_enqueue(exec_ctx, closure, closure->final_data == 0); } + *pclosure = NULL; +} + +static int contains_non_ok_status( + grpc_chttp2_transport_global *transport_global, + grpc_metadata_batch *batch) { + grpc_mdelem *ok_elem = + TRANSPORT_FROM_GLOBAL(transport_global)->parsing.elem_grpc_status_ok; + grpc_linked_mdelem *l; + for (l = batch->list.head; l; l = l->next) { + if (l->md->key == ok_elem->key && l->md != ok_elem) { + return 1; + } + } + return 0; } static void perform_stream_op_locked( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op) { + grpc_closure *on_complete; + GPR_TIMER_BEGIN("perform_stream_op_locked", 0); + + on_complete = op->on_complete; + /* use final_data as a barrier until enqueue time; the inital counter is + dropped at the end of this function */ + on_complete->final_data = 2; + if (op->cancel_with_status != GRPC_STATUS_OK) { - cancel_from_api(transport_global, stream_global, op->cancel_with_status); + cancel_from_api(exec_ctx, transport_global, stream_global, + op->cancel_with_status); } if (op->close_with_status != GRPC_STATUS_OK) { - close_from_api(transport_global, stream_global, op->close_with_status, - op->optional_close_message); - } - - if (op->send_ops) { - GPR_ASSERT(stream_global->outgoing_sopb == NULL); - stream_global->send_done_closure = op->on_done_send; - if (!stream_global->cancelled) { - stream_global->written_anything = 1; - stream_global->outgoing_sopb = op->send_ops; - if (op->is_last_send && - stream_global->write_state == GRPC_WRITE_STATE_OPEN) { - stream_global->write_state = GRPC_WRITE_STATE_QUEUED_CLOSE; - } - if (stream_global->id == 0) { - GRPC_CHTTP2_IF_TRACING(gpr_log( - GPR_DEBUG, - "HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency", - transport_global->is_client ? "CLI" : "SVR", stream_global)); + close_from_api(exec_ctx, transport_global, stream_global, + op->close_with_status, op->optional_close_message); + } + + if (op->send_initial_metadata != NULL) { + GPR_ASSERT(stream_global->send_initial_metadata_finished == NULL); + stream_global->send_initial_metadata_finished = + add_closure_barrier(on_complete); + stream_global->send_initial_metadata = op->send_initial_metadata; + if (contains_non_ok_status(transport_global, op->send_initial_metadata)) { + stream_global->seen_error = 1; + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + } + if (!stream_global->write_closed) { + if (transport_global->is_client) { + GPR_ASSERT(stream_global->id == 0); grpc_chttp2_list_add_waiting_for_concurrency(transport_global, stream_global); maybe_start_some_streams(exec_ctx, transport_global); - } else if (stream_global->outgoing_window > 0) { + } else { + GPR_ASSERT(stream_global->id != 0); grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } } else { - grpc_sopb_reset(op->send_ops); - grpc_exec_ctx_enqueue(exec_ctx, stream_global->send_done_closure, 0); + grpc_chttp2_complete_closure_step( + exec_ctx, &stream_global->send_initial_metadata_finished, 0); } } - if (op->recv_ops) { - GPR_ASSERT(stream_global->publish_sopb == NULL); - GPR_ASSERT(stream_global->published_state != GRPC_STREAM_CLOSED); - stream_global->recv_done_closure = op->on_done_recv; - stream_global->publish_sopb = op->recv_ops; - stream_global->publish_sopb->nops = 0; - stream_global->publish_state = op->recv_state; - /* clamp max recv bytes */ - op->max_recv_bytes = GPR_MIN(op->max_recv_bytes, GPR_UINT32_MAX); - if (stream_global->max_recv_bytes < op->max_recv_bytes) { - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( - "op", transport_global, stream_global, max_recv_bytes, - op->max_recv_bytes - stream_global->max_recv_bytes); - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( - "op", transport_global, stream_global, unannounced_incoming_window, - op->max_recv_bytes - stream_global->max_recv_bytes); - stream_global->unannounced_incoming_window += - (gpr_uint32)op->max_recv_bytes - stream_global->max_recv_bytes; - stream_global->max_recv_bytes = (gpr_uint32)op->max_recv_bytes; + if (op->send_message != NULL) { + GPR_ASSERT(stream_global->send_message_finished == NULL); + GPR_ASSERT(stream_global->send_message == NULL); + stream_global->send_message_finished = add_closure_barrier(on_complete); + if (stream_global->write_closed) { + grpc_chttp2_complete_closure_step( + exec_ctx, &stream_global->send_message_finished, 0); + } else if (stream_global->id != 0) { + stream_global->send_message = op->send_message; + grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } - grpc_chttp2_incoming_metadata_live_op_buffer_end( - &stream_global->outstanding_metadata); - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); - if (stream_global->id != 0) { + } + + if (op->send_trailing_metadata != NULL) { + GPR_ASSERT(stream_global->send_trailing_metadata_finished == NULL); + stream_global->send_trailing_metadata_finished = + add_closure_barrier(on_complete); + stream_global->send_trailing_metadata = op->send_trailing_metadata; + if (contains_non_ok_status(transport_global, op->send_trailing_metadata)) { + stream_global->seen_error = 1; + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + } + if (stream_global->write_closed) { + grpc_chttp2_complete_closure_step( + exec_ctx, &stream_global->send_trailing_metadata_finished, 0); + } else if (stream_global->id != 0) { + /* TODO(ctiller): check if there's flow control for any outstanding + bytes before going writable */ grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } } - if (op->bind_pollset) { - add_to_pollset_locked(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global), - op->bind_pollset); + if (op->recv_initial_metadata != NULL) { + GPR_ASSERT(stream_global->recv_initial_metadata_finished == NULL); + stream_global->recv_initial_metadata_finished = + add_closure_barrier(on_complete); + stream_global->recv_initial_metadata = op->recv_initial_metadata; + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } - grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); + if (op->recv_message != NULL) { + GPR_ASSERT(stream_global->recv_message_ready == NULL); + stream_global->recv_message_ready = op->recv_message_ready; + stream_global->recv_message = op->recv_message; + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + } + + if (op->recv_trailing_metadata != NULL) { + GPR_ASSERT(stream_global->recv_trailing_metadata_finished == NULL); + stream_global->recv_trailing_metadata_finished = + add_closure_barrier(on_complete); + stream_global->recv_trailing_metadata = op->recv_trailing_metadata; + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + } + + grpc_chttp2_complete_closure_step(exec_ctx, &on_complete, 1); + GPR_TIMER_END("perform_stream_op_locked", 0); } @@ -811,12 +928,49 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, * INPUT PROCESSING */ -static grpc_stream_state compute_state(gpr_uint8 write_closed, - gpr_uint8 read_closed) { - if (write_closed && read_closed) return GRPC_STREAM_CLOSED; - if (write_closed) return GRPC_STREAM_SEND_CLOSED; - if (read_closed) return GRPC_STREAM_RECV_CLOSED; - return GRPC_STREAM_OPEN; +static void check_read_ops(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global) { + grpc_chttp2_stream_global *stream_global; + grpc_byte_stream *bs; + while ( + grpc_chttp2_list_pop_check_read_ops(transport_global, &stream_global)) { + if (stream_global->recv_initial_metadata_finished != NULL && + stream_global->published_initial_metadata) { + grpc_chttp2_incoming_metadata_buffer_publish( + &stream_global->received_initial_metadata, + stream_global->recv_initial_metadata); + grpc_chttp2_complete_closure_step( + exec_ctx, &stream_global->recv_initial_metadata_finished, 1); + } + if (stream_global->recv_message_ready != NULL) { + if (stream_global->incoming_frames.head != NULL) { + *stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop( + &stream_global->incoming_frames); + GPR_ASSERT(*stream_global->recv_message != NULL); + grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, 1); + stream_global->recv_message_ready = NULL; + } else if (stream_global->published_trailing_metadata) { + *stream_global->recv_message = NULL; + grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, 1); + stream_global->recv_message_ready = NULL; + } + } + if (stream_global->recv_trailing_metadata_finished != NULL && + stream_global->read_closed && stream_global->write_closed) { + while (stream_global->seen_error && + (bs = grpc_chttp2_incoming_frame_queue_pop( + &stream_global->incoming_frames)) != NULL) { + grpc_byte_stream_destroy(bs); + } + if (stream_global->incoming_frames.head == NULL) { + grpc_chttp2_incoming_metadata_buffer_publish( + &stream_global->received_trailing_metadata, + stream_global->recv_trailing_metadata); + grpc_chttp2_complete_closure_step( + exec_ctx, &stream_global->recv_trailing_metadata_finished, 1); + } + } + } } static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -832,7 +986,7 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, s->global.in_stream_map = 0; if (t->parsing.incoming_stream == &s->parsing) { t->parsing.incoming_stream = NULL; - grpc_chttp2_parsing_become_skip_parser(&t->parsing); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, &t->parsing); } if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { close_transport_locked(exec_ctx, t); @@ -847,109 +1001,10 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } -static void unlock_check_read_write_state(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { - grpc_chttp2_transport_global *transport_global = &t->global; - grpc_chttp2_stream_global *stream_global; - grpc_stream_state state; - - if (!t->parsing_active) { - /* if a stream is in the stream map, and gets cancelled, we need to ensure - we are not parsing before continuing the cancellation to keep things in - a sane state */ - while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global, - &stream_global)) { - GPR_ASSERT(stream_global->in_stream_map); - GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_OPEN); - GPR_ASSERT(stream_global->read_closed); - remove_stream(exec_ctx, t, stream_global->id); - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); - } - } - - if (!t->writing_active) { - while (grpc_chttp2_list_pop_cancelled_waiting_for_writing(transport_global, - &stream_global)) { - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); - } - } - - while (grpc_chttp2_list_pop_read_write_state_changed(transport_global, - &stream_global)) { - if (stream_global->cancelled) { - if (t->writing_active && - stream_global->write_state != GRPC_WRITE_STATE_SENT_CLOSE) { - grpc_chttp2_list_add_cancelled_waiting_for_writing(transport_global, - stream_global); - } else { - stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE; - if (stream_global->outgoing_sopb != NULL) { - grpc_sopb_reset(stream_global->outgoing_sopb); - stream_global->outgoing_sopb = NULL; - grpc_exec_ctx_enqueue(exec_ctx, stream_global->send_done_closure, 1); - } - stream_global->read_closed = 1; - if (!stream_global->published_cancelled) { - char buffer[GPR_LTOA_MIN_BUFSIZE]; - gpr_ltoa(stream_global->cancelled_status, buffer); - grpc_chttp2_incoming_metadata_buffer_add( - &stream_global->incoming_metadata, - grpc_mdelem_from_strings(t->metadata_context, "grpc-status", - buffer)); - grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into( - &stream_global->incoming_metadata, &stream_global->incoming_sopb); - stream_global->published_cancelled = 1; - } - } - } - if (stream_global->write_state == GRPC_WRITE_STATE_SENT_CLOSE && - stream_global->read_closed && stream_global->in_stream_map) { - if (t->parsing_active) { - grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, - stream_global); - } else { - remove_stream(exec_ctx, t, stream_global->id); - } - } - if (!stream_global->publish_sopb) { - continue; - } - if (stream_global->writing_now != 0) { - continue; - } - /* FIXME(ctiller): we include in_stream_map in our computation of - whether the stream is write-closed. This is completely bogus, - but has the effect of delaying stream-closed until the stream - is indeed evicted from the stream map, making it safe to delete. - To fix this will require having an edge after stream-closed - indicating that the stream is closed AND safe to delete. */ - state = compute_state( - stream_global->write_state == GRPC_WRITE_STATE_SENT_CLOSE && - !stream_global->in_stream_map, - stream_global->read_closed); - if (stream_global->incoming_sopb.nops == 0 && - state == stream_global->published_state) { - continue; - } - grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op( - &stream_global->incoming_metadata, &stream_global->incoming_sopb, - &stream_global->outstanding_metadata); - grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb); - stream_global->published_state = *stream_global->publish_state = state; - grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_done_closure, 1); - stream_global->recv_done_closure = NULL; - stream_global->publish_sopb = NULL; - stream_global->publish_state = NULL; - } -} - -static void cancel_from_api(grpc_chttp2_transport_global *transport_global, +static void cancel_from_api(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_status_code status) { - stream_global->cancelled = 1; - stream_global->cancelled_status = status; if (stream_global->id != 0) { gpr_slice_buffer_add( &transport_global->qbuf, @@ -957,11 +1012,89 @@ static void cancel_from_api(grpc_chttp2_transport_global *transport_global, stream_global->id, (gpr_uint32)grpc_chttp2_grpc_status_to_http2_error(status))); } - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); + grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status, + NULL); + grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1, + 1); +} + +void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, + grpc_status_code status, gpr_slice *slice) { + if (status != GRPC_STATUS_OK) { + stream_global->seen_error = 1; + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + } + if (stream_global->published_trailing_metadata && + stream_global->recv_trailing_metadata_finished != NULL) { + /* last chance replacement: we've received trailing metadata, + but something more important has become available to signal + to the upper layers - drop what we've got, and then publish + what we want - which is safe because we haven't told anyone + about the metadata yet */ + grpc_chttp2_incoming_metadata_buffer_reset( + &stream_global->received_trailing_metadata); + stream_global->published_trailing_metadata = 0; + } + if (!stream_global->published_trailing_metadata) { + grpc_mdctx *mdctx = + TRANSPORT_FROM_GLOBAL(transport_global)->metadata_context; + char status_string[GPR_LTOA_MIN_BUFSIZE]; + gpr_ltoa(status, status_string); + grpc_chttp2_incoming_metadata_buffer_add( + &stream_global->received_trailing_metadata, + grpc_mdelem_from_strings(mdctx, "grpc-status", status_string)); + if (slice) { + grpc_chttp2_incoming_metadata_buffer_add( + &stream_global->received_trailing_metadata, + grpc_mdelem_from_metadata_strings( + mdctx, grpc_mdstr_from_string(mdctx, "grpc-message"), + grpc_mdstr_from_slice(mdctx, gpr_slice_ref(*slice)))); + } + stream_global->published_trailing_metadata = 1; + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + } + if (slice) { + gpr_slice_unref(*slice); + } +} + +void grpc_chttp2_mark_stream_closed( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, int close_reads, + int close_writes) { + if (stream_global->read_closed && stream_global->write_closed) { + /* already closed */ + return; + } + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + if (close_reads && !stream_global->read_closed) { + stream_global->read_closed = 1; + stream_global->published_initial_metadata = 1; + stream_global->published_trailing_metadata = 1; + } + if (close_writes && !stream_global->write_closed) { + stream_global->write_closed = 1; + } + if (stream_global->read_closed && stream_global->write_closed) { + if (stream_global->id != 0 && + TRANSPORT_FROM_GLOBAL(transport_global)->parsing_active) { + grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, + stream_global); + } else { + if (stream_global->id != 0) { + remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global), + stream_global->id); + } + stream_global->finished_close = 1; + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); + } + } } -static void close_from_api(grpc_chttp2_transport_global *transport_global, +static void close_from_api(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_status_code status, gpr_slice *optional_message) { @@ -973,10 +1106,7 @@ static void close_from_api(grpc_chttp2_transport_global *transport_global, GPR_ASSERT(status >= 0 && (int)status < 100); - stream_global->cancelled = 1; - stream_global->cancelled_status = status; GPR_ASSERT(stream_global->id != 0); - GPR_ASSERT(!stream_global->written_anything); /* Hand roll a header block. This is unnecessarily ugly - at some point we should find a more elegant @@ -1059,23 +1189,30 @@ static void close_from_api(grpc_chttp2_transport_global *transport_global, &transport_global->qbuf, grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR)); - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); + if (optional_message) { + gpr_slice_ref(*optional_message); + } + grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status, + optional_message); + grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1, + 1); } static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_stream_global *stream_global) { - cancel_from_api(transport_global, stream_global, GRPC_STATUS_UNAVAILABLE); + cancel_from_api(user_data, transport_global, stream_global, + GRPC_STATUS_UNAVAILABLE); } -static void end_all_the_calls(grpc_chttp2_transport *t) { - grpc_chttp2_for_all_streams(&t->global, NULL, cancel_stream_cb); +static void end_all_the_calls(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t) { + grpc_chttp2_for_all_streams(&t->global, exec_ctx, cancel_stream_cb); } static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { close_transport_locked(exec_ctx, t); - end_all_the_calls(t); + end_all_the_calls(exec_ctx, t); } /** update window from a settings change */ @@ -1086,12 +1223,11 @@ static void update_global_window(void *args, gpr_uint32 id, void *stream) { grpc_chttp2_stream_global *stream_global = &s->global; int was_zero; int is_zero; + gpr_int64 initial_window_update = t->parsing.initial_window_update; - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("settings", transport_global, stream_global, - outgoing_window, - t->parsing.initial_window_update); was_zero = stream_global->outgoing_window <= 0; - stream_global->outgoing_window += t->parsing.initial_window_update; + GRPC_CHTTP2_FLOW_CREDIT_STREAM("settings", transport_global, stream_global, + outgoing_window, initial_window_update); is_zero = stream_global->outgoing_window <= 0; if (was_zero && !is_zero) { @@ -1112,6 +1248,9 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) { size_t i; int keep_reading = 0; grpc_chttp2_transport *t = tp; + grpc_chttp2_transport_global *transport_global = &t->global; + grpc_chttp2_transport_parsing *transport_parsing = &t->parsing; + grpc_chttp2_stream_global *stream_global; GPR_TIMER_BEGIN("recv_data", 0); @@ -1123,11 +1262,11 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) { /* merge stream lists */ grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); - grpc_chttp2_prepare_to_read(&t->global, &t->parsing); + grpc_chttp2_prepare_to_read(transport_global, transport_parsing); gpr_mu_unlock(&t->mu); GPR_TIMER_BEGIN("recv_data.parse", 0); for (; i < t->read_buffer.count && - grpc_chttp2_perform_read(exec_ctx, &t->parsing, + grpc_chttp2_perform_read(exec_ctx, transport_parsing, t->read_buffer.slices[i]); i++) ; @@ -1139,16 +1278,28 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) { /* merge stream lists */ grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); - t->global.concurrent_stream_count = + transport_global->concurrent_stream_count = (gpr_uint32)grpc_chttp2_stream_map_size(&t->parsing_stream_map); - if (t->parsing.initial_window_update != 0) { + if (transport_parsing->initial_window_update != 0) { grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, update_global_window, t); - t->parsing.initial_window_update = 0; + transport_parsing->initial_window_update = 0; } /* handle higher level things */ - grpc_chttp2_publish_reads(exec_ctx, &t->global, &t->parsing); + grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing); t->parsing_active = 0; + /* if a stream is in the stream map, and gets cancelled, we need to ensure + * we are not parsing before continuing the cancellation to keep things in + * a sane state */ + while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global, + &stream_global)) { + GPR_ASSERT(stream_global->in_stream_map); + GPR_ASSERT(stream_global->write_closed); + GPR_ASSERT(stream_global->read_closed); + remove_stream(exec_ctx, t, stream_global->id); + stream_global->finished_close = 1; + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); + } } if (!success || i != t->read_buffer.count || t->closed) { drop_connection(exec_ctx, t); @@ -1206,35 +1357,216 @@ static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx, } } +static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_pollset *pollset) { + grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; + lock(t); + add_to_pollset_locked(exec_ctx, t, pollset); + unlock(exec_ctx, t); +} + /* - * TRACING + * BYTE STREAM */ -void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason, - const char *context, const char *var, - int is_client, gpr_uint32 stream_id, - gpr_int64 current_value, gpr_int64 delta) { - char *identifier; - char *context_scope; - char *context_thread; - char *underscore_pos = strchr(context, '_'); - GPR_ASSERT(underscore_pos); - context_thread = gpr_strdup(underscore_pos + 1); - context_scope = gpr_strdup(context); - context_scope[underscore_pos - context] = 0; - if (stream_id) { - gpr_asprintf(&identifier, "%s[%d]", context_scope, stream_id); +static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + gpr_slice *slice, size_t max_size_hint, + grpc_closure *on_complete) { + grpc_chttp2_incoming_byte_stream *bs = + (grpc_chttp2_incoming_byte_stream *)byte_stream; + grpc_chttp2_transport_global *transport_global = &bs->transport->global; + grpc_chttp2_stream_global *stream_global = &bs->stream->global; + gpr_uint32 max_recv_bytes; + + lock(bs->transport); + if (bs->is_tail) { + /* clamp max recv hint to an allowable size */ + if (max_size_hint >= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD) { + max_recv_bytes = GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD; + } else { + max_recv_bytes = (gpr_uint32)max_size_hint; + } + + /* account for bytes already received but unknown to higher layers */ + if (max_recv_bytes >= bs->slices.length) { + max_recv_bytes -= (gpr_uint32)bs->slices.length; + } else { + max_recv_bytes = 0; + } + /* add some small lookahead to keep pipelines flowing */ + GPR_ASSERT(max_recv_bytes <= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD); + max_recv_bytes += GRPC_CHTTP2_STREAM_LOOKAHEAD; + if (stream_global->max_recv_bytes < max_recv_bytes) { + gpr_uint32 add_max_recv_bytes = + max_recv_bytes - stream_global->max_recv_bytes; + GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, + max_recv_bytes, add_max_recv_bytes); + GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, + unannounced_incoming_window_for_parse, + add_max_recv_bytes); + GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, + unannounced_incoming_window_for_writing, + add_max_recv_bytes); + grpc_chttp2_list_add_unannounced_incoming_window_available( + transport_global, stream_global); + grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + } + } + if (bs->slices.count > 0) { + *slice = gpr_slice_buffer_take_first(&bs->slices); + unlock(exec_ctx, bs->transport); + return 1; + } else { + bs->on_next = on_complete; + bs->next = slice; + unlock(exec_ctx, bs->transport); + return 0; + } +} + +static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream *bs) { + if (gpr_unref(&bs->refs)) { + gpr_slice_buffer_destroy(&bs->slices); + gpr_free(bs); + } +} + +static void incoming_byte_stream_destroy(grpc_byte_stream *byte_stream) { + incoming_byte_stream_unref((grpc_chttp2_incoming_byte_stream *)byte_stream); +} + +void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, + grpc_chttp2_incoming_byte_stream *bs, + gpr_slice slice) { + gpr_mu_lock(&bs->transport->mu); + if (bs->on_next != NULL) { + *bs->next = slice; + grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, 1); + bs->on_next = NULL; + } else { + gpr_slice_buffer_add(&bs->slices, slice); + } + gpr_mu_unlock(&bs->transport->mu); +} + +void grpc_chttp2_incoming_byte_stream_finished( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs) { + incoming_byte_stream_unref(bs); +} + +grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( + grpc_chttp2_transport_parsing *transport_parsing, + grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size, + gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue) { + grpc_chttp2_incoming_byte_stream *incoming_byte_stream = + gpr_malloc(sizeof(*incoming_byte_stream)); + incoming_byte_stream->base.length = frame_size; + incoming_byte_stream->base.flags = flags; + incoming_byte_stream->base.next = incoming_byte_stream_next; + incoming_byte_stream->base.destroy = incoming_byte_stream_destroy; + gpr_ref_init(&incoming_byte_stream->refs, 2); + incoming_byte_stream->next_message = NULL; + incoming_byte_stream->transport = TRANSPORT_FROM_PARSING(transport_parsing); + incoming_byte_stream->stream = STREAM_FROM_PARSING(stream_parsing); + gpr_slice_buffer_init(&incoming_byte_stream->slices); + incoming_byte_stream->on_next = NULL; + incoming_byte_stream->is_tail = 1; + if (add_to_queue->head == NULL) { + add_to_queue->head = incoming_byte_stream; } else { - identifier = gpr_strdup(context_scope); - } - gpr_log(GPR_INFO, - "FLOWCTL: %s %-10s %8s %-27s %8lld %c %8lld = %8lld %-10s [%s:%d]", - is_client ? "client" : "server", identifier, context_thread, var, - current_value, delta < 0 ? '-' : '+', delta < 0 ? -delta : delta, - current_value + delta, reason, file, line); - gpr_free(identifier); - gpr_free(context_thread); - gpr_free(context_scope); + add_to_queue->tail->is_tail = 0; + add_to_queue->tail->next_message = incoming_byte_stream; + } + add_to_queue->tail = incoming_byte_stream; + return incoming_byte_stream; +} + +/* + * TRACING + */ + +static char *format_flowctl_context_var(const char *context, const char *var, + gpr_int64 val, gpr_uint32 id, + char **scope) { + char *underscore_pos; + char *result; + if (context == NULL) { + *scope = NULL; + gpr_asprintf(&result, "%s(%lld)", var, val); + return result; + } + underscore_pos = strchr(context, '_'); + *scope = gpr_strdup(context); + (*scope)[underscore_pos - context] = 0; + if (id != 0) { + char *tmp = *scope; + gpr_asprintf(scope, "%s[%d]", tmp, id); + gpr_free(tmp); + } + gpr_asprintf(&result, "%s.%s(%lld)", underscore_pos + 1, var, val); + return result; +} + +static int samestr(char *a, char *b) { + if (a == NULL) { + return b == NULL; + } + if (b == NULL) { + return 0; + } + return 0 == strcmp(a, b); +} + +void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, + grpc_chttp2_flowctl_op op, const char *context1, + const char *var1, const char *context2, + const char *var2, int is_client, + gpr_uint32 stream_id, gpr_int64 val1, + gpr_int64 val2) { + char *scope1; + char *scope2; + char *label1 = + format_flowctl_context_var(context1, var1, val1, stream_id, &scope1); + char *label2 = + format_flowctl_context_var(context2, var2, val2, stream_id, &scope2); + char *clisvr = is_client ? "client" : "server"; + char *prefix; + + gpr_asprintf(&prefix, "FLOW % 8s: %s % 11s ", phase, clisvr, scope1); + + switch (op) { + case GRPC_CHTTP2_FLOWCTL_MOVE: + GPR_ASSERT(samestr(scope1, scope2)); + if (val2 != 0) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "%sMOVE % 40s <- % 40s giving %d", prefix, label1, label2, + val1 + val2); + } + break; + case GRPC_CHTTP2_FLOWCTL_CREDIT: + GPR_ASSERT(val2 >= 0); + if (val2 != 0) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "%sCREDIT % 40s by % 40s giving %d", prefix, label1, label2, + val1 + val2); + } + break; + case GRPC_CHTTP2_FLOWCTL_DEBIT: + GPR_ASSERT(val2 >= 0); + if (val2 != 0) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "%sDEBIT % 40s by % 40s giving %d", prefix, label1, label2, + val1 - val2); + } + break; + } + + gpr_free(scope1); + gpr_free(scope2); + gpr_free(label1); + gpr_free(label2); + gpr_free(prefix); } /* @@ -1246,7 +1578,7 @@ static char *chttp2_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) { } static const grpc_transport_vtable vtable = { - sizeof(grpc_chttp2_stream), init_stream, perform_stream_op, + sizeof(grpc_chttp2_stream), init_stream, set_pollset, perform_stream_op, perform_transport_op, destroy_stream, destroy_transport, chttp2_get_peer}; grpc_transport *grpc_create_chttp2_transport( diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c index 68f23177eb..a72dee4399 100644 --- a/src/core/transport/metadata.c +++ b/src/core/transport/metadata.c @@ -41,9 +41,10 @@ #include <grpc/support/alloc.h> #include <grpc/support/atm.h> #include <grpc/support/log.h> +#include <grpc/support/time.h> +#include "src/core/profiling/timers.h" #include "src/core/support/murmur_hash.h" #include "src/core/transport/chttp2/bin_encoder.h" -#include <grpc/support/time.h> #define INITIAL_STRTAB_CAPACITY 4 #define INITIAL_MDTAB_CAPACITY 4 @@ -232,24 +233,32 @@ static void metadata_context_destroy_locked(grpc_mdctx *ctx) { } void grpc_mdctx_ref(grpc_mdctx *ctx) { + GPR_TIMER_BEGIN("grpc_mdctx_ref", 0); lock(ctx); GPR_ASSERT(ctx->refs > 0); ctx->refs++; unlock(ctx); + GPR_TIMER_END("grpc_mdctx_ref", 0); } void grpc_mdctx_unref(grpc_mdctx *ctx) { + GPR_TIMER_BEGIN("grpc_mdctx_unref", 0); lock(ctx); GPR_ASSERT(ctx->refs > 0); ctx->refs--; unlock(ctx); + GPR_TIMER_END("grpc_mdctx_unref", 0); } static void grow_strtab(grpc_mdctx *ctx) { size_t capacity = ctx->strtab_capacity * 2; size_t i; - internal_string **strtab = gpr_malloc(sizeof(internal_string *) * capacity); + internal_string **strtab; internal_string *s, *next; + + GPR_TIMER_BEGIN("grow_strtab", 0); + + strtab = gpr_malloc(sizeof(internal_string *) * capacity); memset(strtab, 0, sizeof(internal_string *) * capacity); for (i = 0; i < ctx->strtab_capacity; i++) { @@ -263,12 +272,15 @@ static void grow_strtab(grpc_mdctx *ctx) { gpr_free(ctx->strtab); ctx->strtab = strtab; ctx->strtab_capacity = capacity; + + GPR_TIMER_END("grow_strtab", 0); } static void internal_destroy_string(internal_string *is) { internal_string **prev_next; internal_string *cur; grpc_mdctx *ctx = is->context; + GPR_TIMER_BEGIN("internal_destroy_string", 0); if (is->has_base64_and_huffman_encoded) { gpr_slice_unref(is->base64_and_huffman); } @@ -279,6 +291,7 @@ static void internal_destroy_string(internal_string *is) { *prev_next = cur->bucket_next; ctx->strtab_count--; gpr_free(is); + GPR_TIMER_END("internal_destroy_string", 0); } static void internal_string_ref(internal_string *s DEBUG_ARGS) { @@ -304,18 +317,22 @@ static void slice_ref(void *p) { internal_string *is = (internal_string *)((char *)p - offsetof(internal_string, refcount)); grpc_mdctx *ctx = is->context; + GPR_TIMER_BEGIN("slice_ref", 0); lock(ctx); INTERNAL_STRING_REF(is); unlock(ctx); + GPR_TIMER_END("slice_ref", 0); } static void slice_unref(void *p) { internal_string *is = (internal_string *)((char *)p - offsetof(internal_string, refcount)); grpc_mdctx *ctx = is->context; + GPR_TIMER_BEGIN("slice_unref", 0); lock(ctx); INTERNAL_STRING_UNREF(is); unlock(ctx); + GPR_TIMER_END("slice_unref", 0); } grpc_mdstr *grpc_mdstr_from_string(grpc_mdctx *ctx, const char *str) { @@ -334,6 +351,7 @@ grpc_mdstr *grpc_mdstr_from_buffer(grpc_mdctx *ctx, const gpr_uint8 *buf, gpr_uint32 hash = gpr_murmur_hash3(buf, length, ctx->hash_seed); internal_string *s; + GPR_TIMER_BEGIN("grpc_mdstr_from_buffer", 0); lock(ctx); /* search for an existing string */ @@ -342,6 +360,7 @@ grpc_mdstr *grpc_mdstr_from_buffer(grpc_mdctx *ctx, const gpr_uint8 *buf, 0 == memcmp(buf, GPR_SLICE_START_PTR(s->slice), length)) { INTERNAL_STRING_REF(s); unlock(ctx); + GPR_TIMER_END("grpc_mdstr_from_buffer", 0); return (grpc_mdstr *)s; } } @@ -382,6 +401,7 @@ grpc_mdstr *grpc_mdstr_from_buffer(grpc_mdctx *ctx, const gpr_uint8 *buf, } unlock(ctx); + GPR_TIMER_END("grpc_mdstr_from_buffer", 0); return (grpc_mdstr *)s; } @@ -391,6 +411,7 @@ static void gc_mdtab(grpc_mdctx *ctx) { internal_metadata **prev_next; internal_metadata *md, *next; + GPR_TIMER_BEGIN("gc_mdtab", 0); for (i = 0; i < ctx->mdtab_capacity; i++) { prev_next = &ctx->mdtab[i]; for (md = ctx->mdtab[i]; md; md = next) { @@ -412,17 +433,19 @@ static void gc_mdtab(grpc_mdctx *ctx) { } } } - - GPR_ASSERT(ctx->mdtab_free == 0); + GPR_TIMER_END("gc_mdtab", 0); } static void grow_mdtab(grpc_mdctx *ctx) { size_t capacity = ctx->mdtab_capacity * 2; size_t i; - internal_metadata **mdtab = - gpr_malloc(sizeof(internal_metadata *) * capacity); + internal_metadata **mdtab; internal_metadata *md, *next; gpr_uint32 hash; + + GPR_TIMER_BEGIN("grow_mdtab", 0); + + mdtab = gpr_malloc(sizeof(internal_metadata *) * capacity); memset(mdtab, 0, sizeof(internal_metadata *) * capacity); for (i = 0; i < ctx->mdtab_capacity; i++) { @@ -437,6 +460,8 @@ static void grow_mdtab(grpc_mdctx *ctx) { gpr_free(ctx->mdtab); ctx->mdtab = mdtab; ctx->mdtab_capacity = capacity; + + GPR_TIMER_END("grow_mdtab", 0); } static void rehash_mdtab(grpc_mdctx *ctx) { @@ -458,6 +483,8 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdctx *ctx, GPR_ASSERT(key->context == ctx); GPR_ASSERT(value->context == ctx); + GPR_TIMER_BEGIN("grpc_mdelem_from_metadata_strings", 0); + lock(ctx); /* search for an existing pair */ @@ -467,6 +494,7 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdctx *ctx, INTERNAL_STRING_UNREF(key); INTERNAL_STRING_UNREF(value); unlock(ctx); + GPR_TIMER_END("grpc_mdelem_from_metadata_strings", 0); return (grpc_mdelem *)md; } } @@ -496,6 +524,8 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdctx *ctx, unlock(ctx); + GPR_TIMER_END("grpc_mdelem_from_metadata_strings", 0); + return (grpc_mdelem *)md; } @@ -542,6 +572,7 @@ grpc_mdelem *grpc_mdelem_ref(grpc_mdelem *gmd DEBUG_ARGS) { void grpc_mdelem_unref(grpc_mdelem *gmd DEBUG_ARGS) { internal_metadata *md = (internal_metadata *)gmd; + if (!md) return; #ifdef GRPC_METADATA_REFCOUNT_DEBUG gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "ELM UNREF:%p:%d->%d: '%s' = '%s'", md, @@ -552,12 +583,14 @@ void grpc_mdelem_unref(grpc_mdelem *gmd DEBUG_ARGS) { #endif if (2 == gpr_atm_full_fetch_add(&md->refcnt, -1)) { grpc_mdctx *ctx = md->context; + GPR_TIMER_BEGIN("grpc_mdelem_unref.to_zero", 0); lock(ctx); if (1 == gpr_atm_no_barrier_load(&md->refcnt)) { ctx->mdtab_free++; gpr_atm_no_barrier_store(&md->refcnt, 0); } unlock(ctx); + GPR_TIMER_END("grpc_mdelem_unref.to_zero", 0); } } diff --git a/src/core/transport/stream_op.c b/src/core/transport/metadata_batch.c index 6493e77bc5..c5d39e0c9f 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/metadata_batch.c @@ -31,7 +31,7 @@ * */ -#include "src/core/transport/stream_op.h" +#include "src/core/transport/metadata_batch.h" #include <string.h> @@ -40,143 +40,6 @@ #include "src/core/profiling/timers.h" -/* Exponential growth function: Given x, return a larger x. - Currently we grow by 1.5 times upon reallocation. */ -#define GROW(x) (3 * (x) / 2) - -void grpc_sopb_init(grpc_stream_op_buffer *sopb) { - sopb->ops = sopb->inlined_ops; - sopb->nops = 0; - sopb->capacity = GRPC_SOPB_INLINE_ELEMENTS; -} - -void grpc_sopb_destroy(grpc_stream_op_buffer *sopb) { - grpc_stream_ops_unref_owned_objects(sopb->ops, sopb->nops); - if (sopb->ops != sopb->inlined_ops) gpr_free(sopb->ops); -} - -void grpc_sopb_reset(grpc_stream_op_buffer *sopb) { - grpc_stream_ops_unref_owned_objects(sopb->ops, sopb->nops); - sopb->nops = 0; -} - -void grpc_sopb_swap(grpc_stream_op_buffer *a, grpc_stream_op_buffer *b) { - GPR_SWAP(size_t, a->nops, b->nops); - GPR_SWAP(size_t, a->capacity, b->capacity); - - if (a->ops == a->inlined_ops) { - if (b->ops == b->inlined_ops) { - /* swap contents of inlined buffer */ - grpc_stream_op temp[GRPC_SOPB_INLINE_ELEMENTS]; - memcpy(temp, a->ops, b->nops * sizeof(grpc_stream_op)); - memcpy(a->ops, b->ops, a->nops * sizeof(grpc_stream_op)); - memcpy(b->ops, temp, b->nops * sizeof(grpc_stream_op)); - } else { - /* a is inlined, b is not - copy a inlined into b, fix pointers */ - a->ops = b->ops; - b->ops = b->inlined_ops; - memcpy(b->ops, a->inlined_ops, b->nops * sizeof(grpc_stream_op)); - } - } else if (b->ops == b->inlined_ops) { - /* b is inlined, a is not - copy b inlined int a, fix pointers */ - b->ops = a->ops; - a->ops = a->inlined_ops; - memcpy(a->ops, b->inlined_ops, a->nops * sizeof(grpc_stream_op)); - } else { - /* no inlining: easy swap */ - GPR_SWAP(grpc_stream_op *, a->ops, b->ops); - } -} - -void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops) { - size_t i; - for (i = 0; i < nops; i++) { - switch (ops[i].type) { - case GRPC_OP_SLICE: - gpr_slice_unref(ops[i].data.slice); - break; - case GRPC_OP_METADATA: - grpc_metadata_batch_destroy(&ops[i].data.metadata); - break; - case GRPC_NO_OP: - case GRPC_OP_BEGIN_MESSAGE: - break; - } - } -} - -static void expandto(grpc_stream_op_buffer *sopb, size_t new_capacity) { - sopb->capacity = new_capacity; - if (sopb->ops == sopb->inlined_ops) { - sopb->ops = gpr_malloc(sizeof(grpc_stream_op) * new_capacity); - memcpy(sopb->ops, sopb->inlined_ops, sopb->nops * sizeof(grpc_stream_op)); - } else { - sopb->ops = gpr_realloc(sopb->ops, sizeof(grpc_stream_op) * new_capacity); - } -} - -static grpc_stream_op *add(grpc_stream_op_buffer *sopb) { - grpc_stream_op *out; - - GPR_ASSERT(sopb->nops <= sopb->capacity); - if (sopb->nops == sopb->capacity) { - expandto(sopb, GROW(sopb->capacity)); - } - out = sopb->ops + sopb->nops; - sopb->nops++; - return out; -} - -void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb) { - add(sopb)->type = GRPC_NO_OP; -} - -void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length, - gpr_uint32 flags) { - grpc_stream_op *op = add(sopb); - op->type = GRPC_OP_BEGIN_MESSAGE; - op->data.begin_message.length = length; - op->data.begin_message.flags = flags; -} - -void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, - grpc_metadata_batch b) { - grpc_stream_op *op = add(sopb); - op->type = GRPC_OP_METADATA; - op->data.metadata = b; -} - -void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) { - grpc_stream_op *op = add(sopb); - op->type = GRPC_OP_SLICE; - op->data.slice = slice; -} - -void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, - size_t nops) { - size_t orig_nops = sopb->nops; - size_t new_nops = orig_nops + nops; - - if (new_nops > sopb->capacity) { - expandto(sopb, GPR_MAX(GROW(sopb->capacity), new_nops)); - } - - memcpy(sopb->ops + orig_nops, ops, sizeof(grpc_stream_op) * nops); - sopb->nops = new_nops; -} - -void grpc_sopb_move_to(grpc_stream_op_buffer *src, grpc_stream_op_buffer *dst) { - if (src->nops == 0) { - return; - } - if (dst->nops == 0) { - grpc_sopb_swap(src, dst); - return; - } - grpc_sopb_append(dst, src->ops, src->nops); - src->nops = 0; -} - static void assert_valid_list(grpc_mdelem_list *list) { #ifndef NDEBUG grpc_linked_mdelem *l; @@ -200,13 +63,11 @@ static void assert_valid_list(grpc_mdelem_list *list) { #ifndef NDEBUG void grpc_metadata_batch_assert_ok(grpc_metadata_batch *batch) { assert_valid_list(&batch->list); - assert_valid_list(&batch->garbage); } #endif /* NDEBUG */ void grpc_metadata_batch_init(grpc_metadata_batch *batch) { - batch->list.head = batch->list.tail = batch->garbage.head = - batch->garbage.tail = NULL; + batch->list.head = batch->list.tail = NULL; batch->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } @@ -215,9 +76,6 @@ void grpc_metadata_batch_destroy(grpc_metadata_batch *batch) { for (l = batch->list.head; l; l = l->next) { GRPC_MDELEM_UNREF(l->md); } - for (l = batch->garbage.head; l; l = l->next) { - GRPC_MDELEM_UNREF(l->md); - } } void grpc_metadata_batch_add_head(grpc_metadata_batch *batch, @@ -283,10 +141,6 @@ void grpc_metadata_batch_merge(grpc_metadata_batch *target, next = l->next; link_tail(&target->list, l); } - for (l = to_add->garbage.head; l; l = next) { - next = l->next; - link_tail(&target->garbage, l); - } } void grpc_metadata_batch_move(grpc_metadata_batch *dst, @@ -305,7 +159,6 @@ void grpc_metadata_batch_filter(grpc_metadata_batch *batch, GPR_TIMER_BEGIN("grpc_metadata_batch_filter", 0); assert_valid_list(&batch->list); - assert_valid_list(&batch->garbage); for (l = batch->list.head; l; l = next) { grpc_mdelem *orig = l->md; grpc_mdelem *filt = filter(user_data, orig); @@ -324,14 +177,28 @@ void grpc_metadata_batch_filter(grpc_metadata_batch *batch, batch->list.tail = l->prev; } assert_valid_list(&batch->list); - link_head(&batch->garbage, l); + GRPC_MDELEM_UNREF(l->md); } else if (filt != orig) { GRPC_MDELEM_UNREF(orig); l->md = filt; } } assert_valid_list(&batch->list); - assert_valid_list(&batch->garbage); GPR_TIMER_END("grpc_metadata_batch_filter", 0); } + +static grpc_mdelem *no_metadata_for_you(void *user_data, grpc_mdelem *elem) { + return NULL; +} + +void grpc_metadata_batch_clear(grpc_metadata_batch *batch) { + batch->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + grpc_metadata_batch_filter(batch, no_metadata_for_you, NULL); +} + +int grpc_metadata_batch_is_empty(grpc_metadata_batch *batch) { + return batch->list.head == NULL && + gpr_time_cmp(gpr_inf_future(batch->deadline.clock_type), + batch->deadline) == 0; +} diff --git a/src/core/transport/stream_op.h b/src/core/transport/metadata_batch.h index 37f18b02d9..fc3a46004f 100644 --- a/src/core/transport/stream_op.h +++ b/src/core/transport/metadata_batch.h @@ -40,39 +40,6 @@ #include <grpc/support/time.h> #include "src/core/transport/metadata.h" -/* this many stream ops are inlined into a sopb before allocating */ -#define GRPC_SOPB_INLINE_ELEMENTS 4 - -/* Operations that can be performed on a stream. - Used by grpc_stream_op. */ -typedef enum grpc_stream_op_code { - /* Do nothing code. Useful if rewriting a batch to exclude some operations. - Must be ignored by receivers */ - GRPC_NO_OP, - GRPC_OP_METADATA, - /* Begin a message/metadata element/status - as defined by - grpc_message_type. */ - GRPC_OP_BEGIN_MESSAGE, - /* Add a slice of data to the current message/metadata element/status. - Must not overflow the forward declared length. */ - GRPC_OP_SLICE -} grpc_stream_op_code; - -/** Internal bit flag for grpc_begin_message's \a flags signaling the use of - * compression for the message */ -#define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u) -/** Mask of all valid internal flags. */ -#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS) - -/* Arguments for GRPC_OP_BEGIN_MESSAGE */ -typedef struct grpc_begin_message { - /* How many bytes of data will this message contain */ - gpr_uint32 length; - /* Write flags for the message: see grpc.h GRPC_WRITE_* for the public bits, - * GRPC_WRITE_INTERNAL_* for the internal ones. */ - gpr_uint32 flags; -} grpc_begin_message; - typedef struct grpc_linked_mdelem { grpc_mdelem *md; struct grpc_linked_mdelem *next; @@ -88,10 +55,6 @@ typedef struct grpc_mdelem_list { typedef struct grpc_metadata_batch { /** Metadata elements in this batch */ grpc_mdelem_list list; - /** Elements that have been removed from the batch, but have - not yet been unreffed - used to allow collecting garbage - under a single metadata context lock */ - grpc_mdelem_list garbage; /** Used to calculate grpc-timeout at the point of sending, or gpr_inf_future if this batch does not need to send a grpc-timeout */ @@ -102,6 +65,8 @@ void grpc_metadata_batch_init(grpc_metadata_batch *batch); void grpc_metadata_batch_destroy(grpc_metadata_batch *batch); void grpc_metadata_batch_merge(grpc_metadata_batch *target, grpc_metadata_batch *add); +void grpc_metadata_batch_clear(grpc_metadata_batch *batch); +int grpc_metadata_batch_is_empty(grpc_metadata_batch *batch); /** Moves the metadata information from \a src to \a dst. Upon return, \a src is * zeroed. */ @@ -159,54 +124,4 @@ void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd); } while (0) #endif -/* Represents a single operation performed on a stream/transport */ -typedef struct grpc_stream_op { - /* the operation to be applied */ - enum grpc_stream_op_code type; - /* the arguments to this operation. union fields are named according to the - associated op-code */ - union { - grpc_begin_message begin_message; - grpc_metadata_batch metadata; - gpr_slice slice; - } data; -} grpc_stream_op; - -/** A stream op buffer is a wrapper around stream operations that is - * dynamically extendable. */ -typedef struct grpc_stream_op_buffer { - grpc_stream_op *ops; - size_t nops; - size_t capacity; - grpc_stream_op inlined_ops[GRPC_SOPB_INLINE_ELEMENTS]; -} grpc_stream_op_buffer; - -/* Initialize a stream op buffer */ -void grpc_sopb_init(grpc_stream_op_buffer *sopb); -/* Destroy a stream op buffer */ -void grpc_sopb_destroy(grpc_stream_op_buffer *sopb); -/* Reset a sopb to no elements */ -void grpc_sopb_reset(grpc_stream_op_buffer *sopb); -/* Swap two sopbs */ -void grpc_sopb_swap(grpc_stream_op_buffer *a, grpc_stream_op_buffer *b); - -void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops); - -/* Append a GRPC_NO_OP to a buffer */ -void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb); -/* Append a GRPC_OP_BEGIN to a buffer */ -void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length, - gpr_uint32 flags); -void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, - grpc_metadata_batch metadata); -/* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */ -void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice); -/* Append a buffer to a buffer - does not ref/unref any internal objects */ -void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, - size_t nops); - -void grpc_sopb_move_to(grpc_stream_op_buffer *src, grpc_stream_op_buffer *dst); - -char *grpc_sopb_string(grpc_stream_op_buffer *sopb); - #endif /* GRPC_INTERNAL_CORE_TRANSPORT_STREAM_OP_H */ diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 828d212cfe..f2bebc62f3 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -33,9 +33,36 @@ #include "src/core/transport/transport.h" #include <grpc/support/alloc.h> +#include <grpc/support/atm.h> #include <grpc/support/log.h> #include "src/core/transport/transport_impl.h" +#ifdef GRPC_STREAM_REFCOUNT_DEBUG +void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) { + gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count); + gpr_log(GPR_DEBUG, "STREAM %p:%p REF %d->%d %s", refcount, + refcount->destroy.cb_arg, val, val + 1, reason); +#else +void grpc_stream_ref(grpc_stream_refcount *refcount) { +#endif + gpr_ref(&refcount->refs); +} + +#ifdef GRPC_STREAM_REFCOUNT_DEBUG +void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount, + const char *reason) { + gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count); + gpr_log(GPR_DEBUG, "STREAM %p:%p UNREF %d->%d %s", refcount, + refcount->destroy.cb_arg, val, val - 1, reason); +#else +void grpc_stream_unref(grpc_exec_ctx *exec_ctx, + grpc_stream_refcount *refcount) { +#endif + if (gpr_unref(&refcount->refs)) { + grpc_exec_ctx_enqueue(exec_ctx, &refcount->destroy, 1); + } +} + size_t grpc_transport_stream_size(grpc_transport *transport) { return transport->vtable->sizeof_stream; } @@ -47,10 +74,10 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, grpc_stream *stream, - const void *server_data, - grpc_transport_stream_op *initial_op) { - return transport->vtable->init_stream(exec_ctx, transport, stream, - server_data, initial_op); + grpc_stream_refcount *refcount, + const void *server_data) { + return transport->vtable->init_stream(exec_ctx, transport, stream, refcount, + server_data); } void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx, @@ -66,6 +93,12 @@ void grpc_transport_perform_op(grpc_exec_ctx *exec_ctx, transport->vtable->perform_op(exec_ctx, transport, op); } +void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx, + grpc_transport *transport, grpc_stream *stream, + grpc_pollset *pollset) { + transport->vtable->set_pollset(exec_ctx, transport, stream, pollset); +} + void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, grpc_stream *stream) { @@ -79,9 +112,8 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, void grpc_transport_stream_op_finish_with_failure( grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) { - grpc_exec_ctx_enqueue(exec_ctx, op->on_done_recv, 0); - grpc_exec_ctx_enqueue(exec_ctx, op->on_done_send, 0); - grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 0); + grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, 0); + grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, 0); } void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, @@ -129,9 +161,9 @@ void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op, if (optional_message) { cmd = gpr_malloc(sizeof(*cmd)); cmd->message = *optional_message; - cmd->then_call = op->on_consumed; + cmd->then_call = op->on_complete; grpc_closure_init(&cmd->closure, free_message, cmd); - op->on_consumed = &cmd->closure; + op->on_complete = &cmd->closure; op->optional_close_message = &cmd->message; } op->close_with_status = status; diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index d4cee03862..f296ce8251 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -38,7 +38,8 @@ #include "src/core/iomgr/pollset.h" #include "src/core/iomgr/pollset_set.h" -#include "src/core/transport/stream_op.h" +#include "src/core/transport/metadata_batch.h" +#include "src/core/transport/byte_stream.h" #include "src/core/channel/context.h" /* forward declarations */ @@ -49,36 +50,35 @@ typedef struct grpc_transport grpc_transport; for a stream. */ typedef struct grpc_stream grpc_stream; -/* Represents the send/recv closed state of a stream. */ -typedef enum grpc_stream_state { - /* the stream is open for sends and receives */ - GRPC_STREAM_OPEN, - /* the stream is closed for sends, but may still receive data */ - GRPC_STREAM_SEND_CLOSED, - /* the stream is closed for receives, but may still send data */ - GRPC_STREAM_RECV_CLOSED, - /* the stream is closed for both sends and receives */ - GRPC_STREAM_CLOSED -} grpc_stream_state; +typedef struct grpc_stream_refcount { + gpr_refcount refs; + grpc_closure destroy; +} grpc_stream_refcount; + +/*#define GRPC_STREAM_REFCOUNT_DEBUG*/ +#ifdef GRPC_STREAM_REFCOUNT_DEBUG +void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason); +void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount, + const char *reason); +#else +void grpc_stream_ref(grpc_stream_refcount *refcount); +void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount); +#endif /* Transport stream op: a set of operations to perform on a transport against a single stream */ typedef struct grpc_transport_stream_op { - grpc_closure *on_consumed; + grpc_metadata_batch *send_initial_metadata; + grpc_metadata_batch *send_trailing_metadata; - grpc_stream_op_buffer *send_ops; - int is_last_send; - grpc_closure *on_done_send; + grpc_byte_stream *send_message; - grpc_stream_op_buffer *recv_ops; - grpc_stream_state *recv_state; - /** The number of bytes this peer is currently prepared to receive. - These bytes will be eventually used to replenish per-stream flow control - windows. */ - size_t max_recv_bytes; - grpc_closure *on_done_recv; + grpc_metadata_batch *recv_initial_metadata; + grpc_byte_stream **recv_message; + grpc_closure *recv_message_ready; + grpc_metadata_batch *recv_trailing_metadata; - grpc_pollset *bind_pollset; + grpc_closure *on_complete; /** If != GRPC_STATUS_OK, cancel this stream */ grpc_status_code cancel_with_status; @@ -110,8 +110,8 @@ typedef struct grpc_transport_op { gpr_slice *goaway_message; /** set the callback for accepting new streams; this is a permanent callback, unlike the other one-shot closures */ - void (*set_accept_stream)(void *user_data, grpc_transport *transport, - const void *server_data); + void (*set_accept_stream)(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_transport *transport, const void *server_data); void *set_accept_stream_user_data; /** add this transport to a pollset */ grpc_pollset *bind_pollset; @@ -136,8 +136,12 @@ size_t grpc_transport_stream_size(grpc_transport *transport); supplied from the accept_stream callback function */ int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, grpc_stream *stream, - const void *server_data, - grpc_transport_stream_op *initial_op); + grpc_stream_refcount *refcount, + const void *server_data); + +void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx, + grpc_transport *transport, grpc_stream *stream, + grpc_pollset *pollset); /* Destroy transport data for a stream. diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index 900c6340ff..40bfb4b13a 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -43,8 +43,12 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_init_stream */ int (*init_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self, - grpc_stream *stream, const void *server_data, - grpc_transport_stream_op *initial_op); + grpc_stream *stream, grpc_stream_refcount *refcount, + const void *server_data); + + /* implementation of grpc_transport_set_pollset */ + void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_transport *self, + grpc_stream *stream, grpc_pollset *pollset); /* implementation of grpc_transport_perform_stream_op */ void (*perform_stream_op)(grpc_exec_ctx *exec_ctx, grpc_transport *self, diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c index f62c340e97..f3b6db29d6 100644 --- a/src/core/transport/transport_op_string.c +++ b/src/core/transport/transport_op_string.c @@ -69,42 +69,6 @@ static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) { } } -char *grpc_sopb_string(grpc_stream_op_buffer *sopb) { - char *out; - char *tmp; - size_t i; - gpr_strvec b; - gpr_strvec_init(&b); - - for (i = 0; i < sopb->nops; i++) { - grpc_stream_op *op = &sopb->ops[i]; - if (i > 0) gpr_strvec_add(&b, gpr_strdup(", ")); - switch (op->type) { - case GRPC_NO_OP: - gpr_strvec_add(&b, gpr_strdup("NO_OP")); - break; - case GRPC_OP_BEGIN_MESSAGE: - gpr_asprintf(&tmp, "BEGIN_MESSAGE:%d", op->data.begin_message.length); - gpr_strvec_add(&b, tmp); - break; - case GRPC_OP_SLICE: - gpr_asprintf(&tmp, "SLICE:%d", GPR_SLICE_LENGTH(op->data.slice)); - gpr_strvec_add(&b, tmp); - break; - case GRPC_OP_METADATA: - gpr_strvec_add(&b, gpr_strdup("METADATA{")); - put_metadata_list(&b, op->data.metadata); - gpr_strvec_add(&b, gpr_strdup("}")); - break; - } - } - - out = gpr_strvec_flatten(&b, NULL); - gpr_strvec_destroy(&b); - - return out; -} - char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) { char *tmp; char *out; @@ -113,42 +77,52 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) { gpr_strvec b; gpr_strvec_init(&b); - if (op->send_ops) { + if (op->send_initial_metadata != NULL) { if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); first = 0; - gpr_asprintf(&tmp, "SEND%s:%p", op->is_last_send ? "_LAST" : "", - op->on_done_send); - gpr_strvec_add(&b, tmp); - gpr_strvec_add(&b, gpr_strdup("[")); - gpr_strvec_add(&b, grpc_sopb_string(op->send_ops)); - gpr_strvec_add(&b, gpr_strdup("]")); + gpr_strvec_add(&b, gpr_strdup("SEND_INITIAL_METADATA{")); + put_metadata_list(&b, *op->send_initial_metadata); + gpr_strvec_add(&b, gpr_strdup("}")); } - if (op->recv_ops) { + if (op->send_message != NULL) { if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); first = 0; - gpr_asprintf(&tmp, "RECV:%p:max_recv_bytes=%d", op->on_done_recv, - op->max_recv_bytes); + gpr_asprintf(&tmp, "SEND_MESSAGE:flags=0x%08x:len=%d", + op->send_message->flags, op->send_message->length); gpr_strvec_add(&b, tmp); } - if (op->bind_pollset) { + if (op->send_trailing_metadata != NULL) { if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); first = 0; - gpr_strvec_add(&b, gpr_strdup("BIND")); + gpr_strvec_add(&b, gpr_strdup("SEND_TRAILING_METADATA{")); + put_metadata_list(&b, *op->send_trailing_metadata); + gpr_strvec_add(&b, gpr_strdup("}")); } - if (op->cancel_with_status != GRPC_STATUS_OK) { + if (op->recv_initial_metadata != NULL) { if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); first = 0; - gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status); - gpr_strvec_add(&b, tmp); + gpr_strvec_add(&b, gpr_strdup("RECV_INITIAL_METADATA")); + } + + if (op->recv_message != NULL) { + if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); + first = 0; + gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE")); } - if (op->on_consumed != NULL) { + if (op->recv_trailing_metadata != NULL) { if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); first = 0; - gpr_asprintf(&tmp, "ON_CONSUMED:%p", op->on_consumed); + gpr_strvec_add(&b, gpr_strdup("RECV_TRAILING_METADATA")); + } + + if (op->cancel_with_status != GRPC_STATUS_OK) { + if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); + first = 0; + gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status); gpr_strvec_add(&b, tmp); } |