diff options
Diffstat (limited to 'src/core/transport/chttp2/writing.c')
-rw-r--r-- | src/core/transport/chttp2/writing.c | 311 |
1 files changed, 140 insertions, 171 deletions
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index e6f169c280..edf40f54fb 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -39,205 +39,174 @@ #include "src/core/transport/chttp2/http2_errors.h" -static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing); +static void finalize_outbuf (grpc_chttp2_transport_writing * transport_writing); -int grpc_chttp2_unlocking_check_writes( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_transport_writing *transport_writing) { +int +grpc_chttp2_unlocking_check_writes (grpc_chttp2_transport_global * transport_global, grpc_chttp2_transport_writing * transport_writing) +{ grpc_chttp2_stream_global *stream_global; grpc_chttp2_stream_writing *stream_writing; grpc_chttp2_stream_global *first_reinserted_stream = NULL; gpr_uint32 window_delta; /* simple writes are queued to qbuf, and flushed here */ - gpr_slice_buffer_swap(&transport_global->qbuf, &transport_writing->outbuf); - GPR_ASSERT(transport_global->qbuf.count == 0); - - if (transport_global->dirtied_local_settings && - !transport_global->sent_local_settings) { - gpr_slice_buffer_add( - &transport_writing->outbuf, - grpc_chttp2_settings_create( - transport_global->settings[GRPC_SENT_SETTINGS], - transport_global->settings[GRPC_LOCAL_SETTINGS], - transport_global->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS)); - transport_global->force_send_settings = 0; - transport_global->dirtied_local_settings = 0; - transport_global->sent_local_settings = 1; - } + gpr_slice_buffer_swap (&transport_global->qbuf, &transport_writing->outbuf); + GPR_ASSERT (transport_global->qbuf.count == 0); + + if (transport_global->dirtied_local_settings && !transport_global->sent_local_settings) + { + gpr_slice_buffer_add (&transport_writing->outbuf, grpc_chttp2_settings_create (transport_global->settings[GRPC_SENT_SETTINGS], transport_global->settings[GRPC_LOCAL_SETTINGS], transport_global->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS)); + transport_global->force_send_settings = 0; + transport_global->dirtied_local_settings = 0; + transport_global->sent_local_settings = 1; + } /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ - while (grpc_chttp2_list_pop_writable_stream( - transport_global, transport_writing, &stream_global, &stream_writing)) { - if (stream_global == first_reinserted_stream) { - /* prevent infinite loop */ - grpc_chttp2_list_add_first_writable_stream(transport_global, - stream_global); - break; - } - - stream_writing->id = stream_global->id; - stream_writing->send_closed = GRPC_DONT_SEND_CLOSED; - - if (stream_global->outgoing_sopb) { - window_delta = grpc_chttp2_preencode( - stream_global->outgoing_sopb->ops, - &stream_global->outgoing_sopb->nops, - (gpr_uint32)GPR_MIN(GPR_MIN(transport_global->outgoing_window, - stream_global->outgoing_window), - GPR_UINT32_MAX), - &stream_writing->sopb); - GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( - "write", transport_global, outgoing_window, -(gpr_int64)window_delta); - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, - outgoing_window, - -(gpr_int64)window_delta); - transport_global->outgoing_window -= window_delta; - stream_global->outgoing_window -= window_delta; - - if (stream_global->write_state == GRPC_WRITE_STATE_QUEUED_CLOSE && - stream_global->outgoing_sopb->nops == 0) { - if (!transport_global->is_client && !stream_global->read_closed) { - stream_writing->send_closed = GRPC_SEND_CLOSED_WITH_RST_STREAM; - } else { - stream_writing->send_closed = GRPC_SEND_CLOSED; - } - } - - if (stream_global->outgoing_window > 0 && - stream_global->outgoing_sopb->nops != 0) { - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); - if (first_reinserted_stream == NULL && - transport_global->outgoing_window == 0) { - first_reinserted_stream = stream_global; - } - } - } - - if (!stream_global->read_closed && - stream_global->unannounced_incoming_window > 0) { - GPR_ASSERT(stream_writing->announce_window == 0); - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( - "write", transport_writing, stream_writing, announce_window, - stream_global->unannounced_incoming_window); - stream_writing->announce_window = - stream_global->unannounced_incoming_window; - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( - "write", transport_global, stream_global, incoming_window, - stream_global->unannounced_incoming_window); - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( - "write", transport_global, stream_global, unannounced_incoming_window, - -(gpr_int64)stream_global->unannounced_incoming_window); - stream_global->incoming_window += - stream_global->unannounced_incoming_window; - stream_global->unannounced_incoming_window = 0; - grpc_chttp2_list_add_incoming_window_updated(transport_global, - stream_global); - stream_global->writing_now |= GRPC_CHTTP2_WRITING_WINDOW; - } - if (stream_writing->sopb.nops > 0 || - stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { - stream_global->writing_now |= GRPC_CHTTP2_WRITING_DATA; + while (grpc_chttp2_list_pop_writable_stream (transport_global, transport_writing, &stream_global, &stream_writing)) + { + if (stream_global == first_reinserted_stream) + { + /* prevent infinite loop */ + grpc_chttp2_list_add_first_writable_stream (transport_global, stream_global); + break; + } + + stream_writing->id = stream_global->id; + stream_writing->send_closed = GRPC_DONT_SEND_CLOSED; + + if (stream_global->outgoing_sopb) + { + window_delta = grpc_chttp2_preencode (stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops, (gpr_uint32) GPR_MIN (GPR_MIN (transport_global->outgoing_window, stream_global->outgoing_window), GPR_UINT32_MAX), &stream_writing->sopb); + GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT ("write", transport_global, outgoing_window, -(gpr_int64) window_delta); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM ("write", transport_global, stream_global, outgoing_window, -(gpr_int64) window_delta); + transport_global->outgoing_window -= window_delta; + stream_global->outgoing_window -= window_delta; + + if (stream_global->write_state == GRPC_WRITE_STATE_QUEUED_CLOSE && stream_global->outgoing_sopb->nops == 0) + { + if (!transport_global->is_client && !stream_global->read_closed) + { + stream_writing->send_closed = GRPC_SEND_CLOSED_WITH_RST_STREAM; + } + else + { + stream_writing->send_closed = GRPC_SEND_CLOSED; + } + } + + if (stream_global->outgoing_window > 0 && stream_global->outgoing_sopb->nops != 0) + { + grpc_chttp2_list_add_writable_stream (transport_global, stream_global); + if (first_reinserted_stream == NULL && transport_global->outgoing_window == 0) + { + first_reinserted_stream = stream_global; + } + } + } + + if (!stream_global->read_closed && stream_global->unannounced_incoming_window > 0) + { + GPR_ASSERT (stream_writing->announce_window == 0); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM ("write", transport_writing, stream_writing, announce_window, stream_global->unannounced_incoming_window); + stream_writing->announce_window = stream_global->unannounced_incoming_window; + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM ("write", transport_global, stream_global, incoming_window, stream_global->unannounced_incoming_window); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM ("write", transport_global, stream_global, unannounced_incoming_window, -(gpr_int64) stream_global->unannounced_incoming_window); + stream_global->incoming_window += stream_global->unannounced_incoming_window; + stream_global->unannounced_incoming_window = 0; + grpc_chttp2_list_add_incoming_window_updated (transport_global, stream_global); + stream_global->writing_now |= GRPC_CHTTP2_WRITING_WINDOW; + } + if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) + { + stream_global->writing_now |= GRPC_CHTTP2_WRITING_DATA; + } + if (stream_global->writing_now != 0) + { + grpc_chttp2_list_add_writing_stream (transport_writing, stream_writing); + } } - if (stream_global->writing_now != 0) { - grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); - } - } /* if the grpc_chttp2_transport is ready to send a window update, do so here also; 3/4 is a magic number that will likely get tuned soon */ - if (transport_global->incoming_window < - transport_global->connection_window_target * 3 / 4) { - window_delta = transport_global->connection_window_target - - transport_global->incoming_window; - gpr_slice_buffer_add(&transport_writing->outbuf, - grpc_chttp2_window_update_create(0, window_delta)); - GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT("write", transport_global, - incoming_window, window_delta); - transport_global->incoming_window += window_delta; - } - - return transport_writing->outbuf.count > 0 || - grpc_chttp2_list_have_writing_streams(transport_writing); + if (transport_global->incoming_window < transport_global->connection_window_target * 3 / 4) + { + window_delta = transport_global->connection_window_target - transport_global->incoming_window; + gpr_slice_buffer_add (&transport_writing->outbuf, grpc_chttp2_window_update_create (0, window_delta)); + GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT ("write", transport_global, incoming_window, window_delta); + transport_global->incoming_window += window_delta; + } + + return transport_writing->outbuf.count > 0 || grpc_chttp2_list_have_writing_streams (transport_writing); } -void grpc_chttp2_perform_writes( - grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint, - grpc_closure_list *closure_list) { - GPR_ASSERT(transport_writing->outbuf.count > 0 || - grpc_chttp2_list_have_writing_streams(transport_writing)); +void +grpc_chttp2_perform_writes (grpc_chttp2_transport_writing * transport_writing, grpc_endpoint * endpoint, grpc_closure_list * closure_list) +{ + GPR_ASSERT (transport_writing->outbuf.count > 0 || grpc_chttp2_list_have_writing_streams (transport_writing)); - finalize_outbuf(transport_writing); + finalize_outbuf (transport_writing); - GPR_ASSERT(transport_writing->outbuf.count > 0); - GPR_ASSERT(endpoint); + GPR_ASSERT (transport_writing->outbuf.count > 0); + GPR_ASSERT (endpoint); - grpc_endpoint_write(endpoint, &transport_writing->outbuf, - &transport_writing->done_cb, closure_list); + grpc_endpoint_write (endpoint, &transport_writing->outbuf, &transport_writing->done_cb, closure_list); } -static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { +static void +finalize_outbuf (grpc_chttp2_transport_writing * transport_writing) +{ grpc_chttp2_stream_writing *stream_writing; - while ( - grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) { - if (stream_writing->sopb.nops > 0 || - stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { - grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops, - stream_writing->send_closed != GRPC_DONT_SEND_CLOSED, - stream_writing->id, - &transport_writing->hpack_compressor, - &transport_writing->outbuf); - stream_writing->sopb.nops = 0; + while (grpc_chttp2_list_pop_writing_stream (transport_writing, &stream_writing)) + { + if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) + { + grpc_chttp2_encode (stream_writing->sopb.ops, stream_writing->sopb.nops, stream_writing->send_closed != GRPC_DONT_SEND_CLOSED, stream_writing->id, &transport_writing->hpack_compressor, &transport_writing->outbuf); + stream_writing->sopb.nops = 0; + } + if (stream_writing->announce_window > 0) + { + gpr_slice_buffer_add (&transport_writing->outbuf, grpc_chttp2_window_update_create (stream_writing->id, stream_writing->announce_window)); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM ("write", transport_writing, stream_writing, announce_window, -(gpr_int64) stream_writing->announce_window); + stream_writing->announce_window = 0; + } + if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) + { + gpr_slice_buffer_add (&transport_writing->outbuf, grpc_chttp2_rst_stream_create (stream_writing->id, GRPC_CHTTP2_NO_ERROR)); + } + grpc_chttp2_list_add_written_stream (transport_writing, stream_writing); } - if (stream_writing->announce_window > 0) { - gpr_slice_buffer_add( - &transport_writing->outbuf, - grpc_chttp2_window_update_create(stream_writing->id, - stream_writing->announce_window)); - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( - "write", transport_writing, stream_writing, announce_window, - -(gpr_int64)stream_writing->announce_window); - stream_writing->announce_window = 0; - } - if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) { - gpr_slice_buffer_add(&transport_writing->outbuf, - grpc_chttp2_rst_stream_create(stream_writing->id, - GRPC_CHTTP2_NO_ERROR)); - } - grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); - } } -void grpc_chttp2_cleanup_writing( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_transport_writing *transport_writing, - grpc_closure_list *closure_list) { +void +grpc_chttp2_cleanup_writing (grpc_chttp2_transport_global * transport_global, grpc_chttp2_transport_writing * transport_writing, grpc_closure_list * closure_list) +{ grpc_chttp2_stream_writing *stream_writing; grpc_chttp2_stream_global *stream_global; - while (grpc_chttp2_list_pop_written_stream( - transport_global, transport_writing, &stream_global, &stream_writing)) { - GPR_ASSERT(stream_global->writing_now != 0); - if (stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { - stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE; - if (!transport_global->is_client) { - stream_global->read_closed = 1; - } - } - if (stream_global->writing_now & GRPC_CHTTP2_WRITING_DATA) { - if (stream_global->outgoing_sopb != NULL && - stream_global->outgoing_sopb->nops == 0) { - GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE); - stream_global->outgoing_sopb = NULL; - grpc_closure_list_add(closure_list, stream_global->send_done_closure, - 1); - } + while (grpc_chttp2_list_pop_written_stream (transport_global, transport_writing, &stream_global, &stream_writing)) + { + GPR_ASSERT (stream_global->writing_now != 0); + if (stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) + { + stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE; + if (!transport_global->is_client) + { + stream_global->read_closed = 1; + } + } + if (stream_global->writing_now & GRPC_CHTTP2_WRITING_DATA) + { + if (stream_global->outgoing_sopb != NULL && stream_global->outgoing_sopb->nops == 0) + { + GPR_ASSERT (stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE); + stream_global->outgoing_sopb = NULL; + grpc_closure_list_add (closure_list, stream_global->send_done_closure, 1); + } + } + stream_global->writing_now = 0; + grpc_chttp2_list_add_read_write_state_changed (transport_global, stream_global); } - stream_global->writing_now = 0; - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); - } - gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf); + gpr_slice_buffer_reset_and_unref (&transport_writing->outbuf); } |