diff options
Diffstat (limited to 'src/core/transport/chttp2/parsing.c')
-rw-r--r-- | src/core/transport/chttp2/parsing.c | 137 |
1 files changed, 137 insertions, 0 deletions
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index e2c39bec3c..9f50194125 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -32,6 +32,9 @@ */ #include "src/core/transport/chttp2/internal.h" + +#include <string.h> + #include "src/core/transport/chttp2/timeout_encoding.h" #include <grpc/support/alloc.h> @@ -50,6 +53,9 @@ static int init_skip_frame_parser(grpc_chttp2_transport_parsing *transport_parsi static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last); void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_parsing *transport_parsing) { + grpc_chttp2_stream_global *stream_global; + grpc_chttp2_stream_parsing *stream_parsing; + /* transport_parsing->last_incoming_stream_id is used as last-grpc_chttp2_stream-id when sending GOAWAY frame. https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.8 @@ -59,6 +65,84 @@ void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *transport_global, g if (!transport_parsing->is_client) { transport_global->last_incoming_stream_id = transport_parsing->incoming_stream_id; } + + /* TODO(ctiller): re-implement */ + GPR_ASSERT(transport_parsing->initial_window_update == 0); + +#if 0 + while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) { + int publish = 0; + GPR_ASSERT(s->incoming_sopb); + *s->publish_state = + compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed); + if (*s->publish_state != s->published_state) { + s->published_state = *s->publish_state; + publish = 1; + if (s->published_state == GRPC_STREAM_CLOSED) { + remove_from_stream_map(t, s); + } + } + if (s->parser.incoming_sopb.nops > 0) { + grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb); + publish = 1; + } + if (publish) { + if (s->incoming_metadata_count > 0) { + patch_metadata_ops(s); + } + s->incoming_sopb = NULL; + schedule_cb(t, s->global.recv_done_closure, 1); + } + } +#endif + + /* copy parsing qbuf to global qbuf */ + gpr_slice_buffer_move_into(&transport_parsing->qbuf, &transport_global->qbuf); + + /* update global settings */ + if (transport_parsing->settings_updated) { + memcpy(transport_global->settings[PEER_SETTINGS], transport_parsing->settings, sizeof(transport_parsing->settings)); + transport_parsing->settings_updated = 0; + } + + /* update settings based on ack if received */ + if (transport_parsing->settings_ack_received) { + memcpy(transport_global->settings[ACKED_SETTINGS], transport_global->settings[SENT_SETTINGS], + GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32)); + transport_parsing->settings_ack_received = 0; + } + + /* move goaway to the global state if we received one (it will be + published later */ + if (transport_parsing->goaway_received) { + gpr_slice_unref(transport_global->goaway_text); + transport_global->goaway_text = gpr_slice_ref(transport_parsing->goaway_text); + transport_global->goaway_error = transport_parsing->goaway_error; + transport_global->have_goaway = 1; + transport_parsing->goaway_received = 0; + } + + /* for each stream that saw an update, fixup global state */ + while (grpc_chttp2_list_pop_parsing_seen_stream(transport_global, transport_parsing, &stream_global, &stream_parsing)) { + /* update incoming flow control window */ + if (stream_parsing->incoming_window_delta) { + stream_global->incoming_window -= stream_parsing->incoming_window_delta; + stream_parsing->incoming_window_delta = 0; + grpc_chttp2_list_add_writable_window_update_stream(transport_global, stream_global); + } + + /* update outgoing flow control window */ + if (stream_parsing->outgoing_window_update) { + int was_zero = stream_global->outgoing_window <= 0; + int is_zero; + stream_global->outgoing_window += stream_parsing->outgoing_window_update; + stream_parsing->outgoing_window_update = 0; + is_zero = stream_global->outgoing_window <= 0; + if (was_zero && !is_zero) { + grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + } + } + } } int grpc_chttp2_perform_read(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice) { @@ -555,6 +639,59 @@ void grpc_chttp2_parsing_add_metadata_batch(grpc_chttp2_transport_parsing *trans grpc_sopb_add_metadata(&stream_parsing->data_parser.incoming_sopb, b); } +static void patch_metadata_ops(grpc_chttp2_stream_global *stream_global, grpc_chttp2_stream_parsing *stream_parsing) { + grpc_stream_op *ops = stream_global->incoming_sopb->ops; + size_t nops = stream_global->incoming_sopb->nops; + size_t i; + size_t j; + size_t mdidx = 0; + size_t last_mdidx; + int found_metadata = 0; + + /* rework the array of metadata into a linked list, making use + of the breadcrumbs we left in metadata batches during + add_metadata_batch */ + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; + found_metadata = 1; + /* we left a breadcrumb indicating where the end of this list is, + and since we add sequentially, we know from the end of the last + segment where this segment begins */ + last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail); + GPR_ASSERT(last_mdidx > mdidx); + GPR_ASSERT(last_mdidx <= stream_parsing->incoming_metadata_count); + /* turn the array into a doubly linked list */ + op->data.metadata.list.head = &stream_parsing->incoming_metadata[mdidx]; + op->data.metadata.list.tail = &stream_parsing->incoming_metadata[last_mdidx - 1]; + for (j = mdidx + 1; j < last_mdidx; j++) { + stream_parsing->incoming_metadata[j].prev = &stream_parsing->incoming_metadata[j - 1]; + stream_parsing->incoming_metadata[j - 1].next = &stream_parsing->incoming_metadata[j]; + } + stream_parsing->incoming_metadata[mdidx].prev = NULL; + stream_parsing->incoming_metadata[last_mdidx - 1].next = NULL; + /* track where we're up to */ + mdidx = last_mdidx; + } + if (found_metadata) { + stream_parsing->old_incoming_metadata = stream_parsing->incoming_metadata; + if (mdidx != stream_parsing->incoming_metadata_count) { + /* we have a partially read metadata batch still in incoming_metadata */ + size_t new_count = stream_parsing->incoming_metadata_count - mdidx; + size_t copy_bytes = sizeof(*stream_parsing->incoming_metadata) * new_count; + GPR_ASSERT(mdidx < stream_parsing->incoming_metadata_count); + stream_parsing->incoming_metadata = gpr_malloc(copy_bytes); + memcpy(stream_parsing->old_incoming_metadata + mdidx, stream_parsing->incoming_metadata, + copy_bytes); + stream_parsing->incoming_metadata_count = stream_parsing->incoming_metadata_capacity = new_count; + } else { + stream_parsing->incoming_metadata = NULL; + stream_parsing->incoming_metadata_count = 0; + stream_parsing->incoming_metadata_capacity = 0; + } + } +} + static int parse_frame_slice(grpc_chttp2_transport_parsing *transport_parsing, gpr_slice slice, int is_last) { grpc_chttp2_stream_parsing *stream_parsing = transport_parsing->incoming_stream; switch (transport_parsing->parser(transport_parsing->parser_data, transport_parsing, stream_parsing, slice, is_last)) { |