diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/transport/chttp2_transport.c | 179 |
1 files changed, 101 insertions, 78 deletions
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index caaced75c4..f3af5b1320 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -104,6 +104,10 @@ typedef enum { /* streams that have finished reading: we wait until unlock to coalesce all changes into one callback */ FINISHED_READ_OP, + MAYBE_FINISH_READ_AFTER_PARSE, + PARSER_CHECK_WINDOW_UPDATES_AFTER_PARSE, + OTHER_CHECK_WINDOW_UPDATES_AFTER_PARSE, + NEW_OUTGOING_WINDOW, STREAM_LIST_COUNT /* must be last */ } stream_list_id; @@ -229,6 +233,7 @@ struct transport { /* basic state management - what are we doing at the moment? */ gpr_uint8 reading; + gpr_uint8 parsing; gpr_uint8 writing; /** are we calling back (via cb) with a channel-level event */ gpr_uint8 calling_back_channel; @@ -254,6 +259,7 @@ struct transport { /* window management */ gpr_uint32 outgoing_window; + gpr_uint32 outgoing_window_update; gpr_uint32 incoming_window; gpr_uint32 connection_window_target; @@ -319,6 +325,7 @@ struct stream { gpr_uint32 incoming_window; gpr_int64 outgoing_window; + gpr_uint32 outgoing_window_update; /* when the application requests writes be closed, the write_closed is 'queued'; when the close is flow controlled into the send path, we are 'sending' it; when the write has been performed it is 'sent' */ @@ -395,7 +402,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, grpc_endpoint_cb_status error); static void schedule_cb(transport *t, op_closure closure, int success); -static void maybe_finish_read(transport *t, stream *s); +static void maybe_finish_read(transport *t, stream *s, int is_parser); static void maybe_join_window_updates(transport *t, stream *s); static void finish_reads(transport *t); static void add_to_pollset_locked(transport *t, grpc_pollset *pollset); @@ -652,8 +659,8 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, ref_transport(t); + lock(t); if (!server_data) { - lock(t); s->id = 0; s->outgoing_window = 0; s->incoming_window = 0; @@ -675,9 +682,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, if (initial_op) perform_op_locked(t, s, initial_op); - if (!server_data) { - unlock(t); - } + unlock(t); return 0; } @@ -694,16 +699,11 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { gpr_mu_lock(&t->mu); - /* stop parsing if we're currently parsing this stream */ - if (t->deframe_state == DTS_FRAME && t->incoming_stream_id == s->id && - s->id != 0) { - become_skip_parser(t); - } + GPR_ASSERT(s->published_state == GRPC_STREAM_CLOSED || s->id == 0); for (i = 0; i < STREAM_LIST_COUNT; i++) { stream_list_remove(t, s, i); } - remove_from_stream_map(t, s); gpr_mu_unlock(&t->mu); @@ -835,7 +835,9 @@ static void unlock(transport *t) { finalize_cancellations(t); } - finish_reads(t); + if (!t->parsing) { + finish_reads(t); + } /* gather any callbacks that need to be made */ if (!t->calling_back_ops) { @@ -850,7 +852,7 @@ static void unlock(transport *t) { t->cb = NULL; /* no more callbacks */ t->error_state = ERROR_STATE_NOTIFIED; } - if (t->num_pending_goaways) { + if (!t->parsing && t->num_pending_goaways) { goaways = t->pending_goaways; num_goaways = t->num_pending_goaways; t->pending_goaways = NULL; @@ -930,8 +932,10 @@ static int prepare_write(transport *t) { gpr_uint32 window_delta; /* simple writes are queued to qbuf, and flushed here */ - gpr_slice_buffer_swap(&t->qbuf, &t->outbuf); - GPR_ASSERT(t->qbuf.count == 0); + if (!t->parsing) { + gpr_slice_buffer_swap(&t->qbuf, &t->outbuf); + GPR_ASSERT(t->qbuf.count == 0); + } if (t->dirtied_local_settings && !t->sent_local_settings) { gpr_slice_buffer_add( @@ -976,26 +980,28 @@ static int prepare_write(transport *t) { } } - /* for each stream that wants to update its window, add that window here */ - while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) { - window_delta = - t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - - s->incoming_window; - if (!s->read_closed && window_delta) { - gpr_slice_buffer_add( - &t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta)); - FLOWCTL_TRACE(t, s, incoming, s->id, window_delta); - s->incoming_window += window_delta; + if (!t->parsing) { + /* for each stream that wants to update its window, add that window here */ + while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) { + window_delta = + t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - + s->incoming_window; + if (!s->read_closed && window_delta) { + gpr_slice_buffer_add( + &t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta)); + FLOWCTL_TRACE(t, s, incoming, s->id, window_delta); + s->incoming_window += window_delta; + } } - } - /* if the transport is ready to send a window update, do so here also */ - if (t->incoming_window < t->connection_window_target * 3 / 4) { - window_delta = t->connection_window_target - t->incoming_window; - gpr_slice_buffer_add(&t->outbuf, - grpc_chttp2_window_update_create(0, window_delta)); - FLOWCTL_TRACE(t, t, incoming, 0, window_delta); - t->incoming_window += window_delta; + /* if the transport is ready to send a window update, do so here also */ + if (t->incoming_window < t->connection_window_target * 3 / 4) { + window_delta = t->connection_window_target - t->incoming_window; + gpr_slice_buffer_add(&t->outbuf, + grpc_chttp2_window_update_create(0, window_delta)); + FLOWCTL_TRACE(t, t, incoming, 0, window_delta); + t->incoming_window += window_delta; + } } return t->outbuf.length > 0 || !stream_list_empty(t, WRITING); @@ -1031,7 +1037,7 @@ static void finish_write_common(transport *t, int success) { if (!t->is_client) { s->read_closed = 1; } - maybe_finish_read(t, s); + maybe_finish_read(t, s, 0); } t->outbuf.count = 0; t->outbuf.length = 0; @@ -1089,7 +1095,7 @@ static void add_goaway(transport *t, gpr_uint32 goaway_error, static void maybe_start_some_streams(transport *t) { /* start streams where we have free stream ids and free concurrency */ - while (t->next_stream_id <= MAX_CLIENT_STREAM_ID && + while (!t->parsing && t->next_stream_id <= MAX_CLIENT_STREAM_ID && grpc_chttp2_stream_map_size(&t->stream_map) < t->settings[PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) { @@ -1169,7 +1175,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { s->publish_state = op->recv_state; gpr_free(s->old_incoming_metadata); s->old_incoming_metadata = NULL; - maybe_finish_read(t, s); + maybe_finish_read(t, s, 0); maybe_join_window_updates(t, s); } @@ -1231,7 +1237,7 @@ static void finalize_cancellations(transport *t) { while ((s = stream_list_remove_head(t, CANCELLED))) { s->read_closed = 1; s->write_state = WRITE_STATE_SENT_CLOSE; - maybe_finish_read(t, s); + maybe_finish_read(t, s, 0); } } @@ -1249,7 +1255,8 @@ static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) { static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, grpc_status_code local_status, grpc_chttp2_error_code error_code, - grpc_mdstr *optional_message, int send_rst) { + grpc_mdstr *optional_message, int send_rst, + int is_parser) { int had_outgoing; char buffer[GPR_LTOA_MIN_BUFSIZE]; @@ -1299,7 +1306,7 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, add_metadata_batch(t, s); } } - maybe_finish_read(t, s); + maybe_finish_read(t, s, is_parser); } if (!id) send_rst = 0; if (send_rst) { @@ -1314,8 +1321,10 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, static void cancel_stream_id(transport *t, gpr_uint32 id, grpc_status_code local_status, grpc_chttp2_error_code error_code, int send_rst) { + lock(t); cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code, - NULL, send_rst); + NULL, send_rst, 1); + unlock(t); } static void cancel_stream(transport *t, stream *s, @@ -1323,7 +1332,7 @@ static void cancel_stream(transport *t, stream *s, grpc_chttp2_error_code error_code, grpc_mdstr *optional_message, int send_rst) { cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message, - send_rst); + send_rst, 0); } static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) { @@ -1343,13 +1352,19 @@ static void drop_connection(transport *t) { end_all_the_calls(t); } -static void maybe_finish_read(transport *t, stream *s) { - if (s->incoming_sopb) { +static void maybe_finish_read(transport *t, stream *s, int is_parser) { + if (is_parser) { + stream_list_join(t, s, MAYBE_FINISH_READ_AFTER_PARSE); + } else if (s->incoming_sopb) { stream_list_join(t, s, FINISHED_READ_OP); } } static void maybe_join_window_updates(transport *t, stream *s) { + if (t->parsing) { + stream_list_join(t, s, OTHER_CHECK_WINDOW_UPDATES_AFTER_PARSE); + return; + } if (s->incoming_sopb != NULL && s->incoming_window < t->settings[LOCAL_SETTINGS] @@ -1378,7 +1393,7 @@ static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) { s->incoming_window -= t->incoming_frame_size; /* if the stream incoming window is getting low, schedule an update */ - maybe_join_window_updates(t, s); + stream_list_join(t, s, PARSER_CHECK_WINDOW_UPDATES_AFTER_PARSE); return GRPC_CHTTP2_PARSE_OK; } @@ -1475,7 +1490,7 @@ static void on_header(void *tp, grpc_mdelem *md) { } else { add_incoming_metadata(t, s, md); } - maybe_finish_read(t, s); + maybe_finish_read(t, s, 1); } static int init_header_frame_parser(transport *t, int is_continuation) { @@ -1667,9 +1682,11 @@ static int init_frame_parser(transport *t) { } } +/* static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) { return window + window_update < MAX_WINDOW; } +*/ static void add_metadata_batch(transport *t, stream *s) { grpc_metadata_batch b; @@ -1695,18 +1712,17 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { case GRPC_CHTTP2_PARSE_OK: if (st.end_of_stream) { t->incoming_stream->read_closed = 1; - maybe_finish_read(t, t->incoming_stream); + maybe_finish_read(t, t->incoming_stream, 1); } if (st.need_flush_reads) { - maybe_finish_read(t, t->incoming_stream); + maybe_finish_read(t, t->incoming_stream, 1); } if (st.metadata_boundary) { add_metadata_batch(t, t->incoming_stream); - maybe_finish_read(t, t->incoming_stream); + maybe_finish_read(t, t->incoming_stream, 1); } if (st.ack_settings) { gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create()); - maybe_start_some_streams(t); } if (st.send_ping_ack) { gpr_slice_buffer_add( @@ -1737,13 +1753,8 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { if (st.initial_window_update) { for (i = 0; i < t->stream_map.count; i++) { stream *s = (stream *)(t->stream_map.values[i]); - int was_window_empty = s->outgoing_window <= 0; - FLOWCTL_TRACE(t, s, outgoing, s->id, st.initial_window_update); - s->outgoing_window += st.initial_window_update; - if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb && - s->outgoing_sopb->nops > 0) { - stream_list_join(t, s, WRITABLE); - } + s->outgoing_window_update += st.initial_window_update; + stream_list_join(t, s, NEW_OUTGOING_WINDOW); } } if (st.window_update) { @@ -1751,30 +1762,12 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { /* if there was a stream id, this is for some stream */ stream *s = lookup_stream(t, t->incoming_stream_id); if (s) { - int was_window_empty = s->outgoing_window <= 0; - if (!is_window_update_legal(st.window_update, s->outgoing_window)) { - cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status( - GRPC_CHTTP2_FLOW_CONTROL_ERROR), - GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1); - } else { - FLOWCTL_TRACE(t, s, outgoing, s->id, st.window_update); - s->outgoing_window += st.window_update; - /* if this window update makes outgoing ops writable again, - flag that */ - if (was_window_empty && s->outgoing_sopb && - s->outgoing_sopb->nops > 0) { - stream_list_join(t, s, WRITABLE); - } - } + s->outgoing_window_update += st.window_update; + stream_list_join(t, s, NEW_OUTGOING_WINDOW); } } else { /* transport level window update */ - if (!is_window_update_legal(st.window_update, t->outgoing_window)) { - drop_connection(t); - } else { - FLOWCTL_TRACE(t, t, outgoing, 0, st.window_update); - t->outgoing_window += st.window_update; - } + t->outgoing_window_update += st.window_update; } } return 1; @@ -1979,6 +1972,7 @@ static int process_read(transport *t, gpr_slice slice) { static void recv_data(void *tp, gpr_slice *slices, size_t nslices, grpc_endpoint_cb_status error) { transport *t = tp; + stream *s; size_t i; int keep_reading = 0; @@ -1998,11 +1992,40 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, unref_transport(t); break; case GRPC_ENDPOINT_CB_OK: - lock(t); + gpr_mu_lock(&t->mu); + GPR_ASSERT(!t->parsing); + t->parsing = 1; + gpr_mu_unlock(&t->mu); if (t->cb) { for (i = 0; i < nslices && process_read(t, slices[i]); i++) ; } + lock(t); + t->parsing = 0; + while ((s = stream_list_remove_head(t, MAYBE_FINISH_READ_AFTER_PARSE))) { + maybe_finish_read(t, s, 0); + } + while ((s = stream_list_remove_head(t, PARSER_CHECK_WINDOW_UPDATES_AFTER_PARSE))) { + maybe_join_window_updates(t, s); + } + while ((s = stream_list_remove_head(t, OTHER_CHECK_WINDOW_UPDATES_AFTER_PARSE))) { + maybe_join_window_updates(t, s); + } + while ((s = stream_list_remove_head(t, NEW_OUTGOING_WINDOW))) { + int was_window_empty = s->outgoing_window <= 0; + FLOWCTL_TRACE(t, s, outgoing, s->id, s->outgoing_window_update); + s->outgoing_window += s->outgoing_window_update; + s->outgoing_window_update = 0; + /* if this window update makes outgoing ops writable again, + flag that */ + if (was_window_empty && s->outgoing_sopb && + s->outgoing_sopb->nops > 0) { + stream_list_join(t, s, WRITABLE); + } + } + t->outgoing_window += t->outgoing_window_update; + t->outgoing_window_update = 0; + maybe_start_some_streams(t); unlock(t); keep_reading = 1; break; |