aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/transport/chttp2/writing.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/transport/chttp2/writing.c')
-rw-r--r--src/core/transport/chttp2/writing.c311
1 files changed, 140 insertions, 171 deletions
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index e6f169c280..edf40f54fb 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -39,205 +39,174 @@
#include "src/core/transport/chttp2/http2_errors.h"
-static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing);
+static void finalize_outbuf (grpc_chttp2_transport_writing * transport_writing);
-int grpc_chttp2_unlocking_check_writes(
- grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_transport_writing *transport_writing) {
+int
+grpc_chttp2_unlocking_check_writes (grpc_chttp2_transport_global * transport_global, grpc_chttp2_transport_writing * transport_writing)
+{
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *first_reinserted_stream = NULL;
gpr_uint32 window_delta;
/* simple writes are queued to qbuf, and flushed here */
- gpr_slice_buffer_swap(&transport_global->qbuf, &transport_writing->outbuf);
- GPR_ASSERT(transport_global->qbuf.count == 0);
-
- if (transport_global->dirtied_local_settings &&
- !transport_global->sent_local_settings) {
- gpr_slice_buffer_add(
- &transport_writing->outbuf,
- grpc_chttp2_settings_create(
- transport_global->settings[GRPC_SENT_SETTINGS],
- transport_global->settings[GRPC_LOCAL_SETTINGS],
- transport_global->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
- transport_global->force_send_settings = 0;
- transport_global->dirtied_local_settings = 0;
- transport_global->sent_local_settings = 1;
- }
+ gpr_slice_buffer_swap (&transport_global->qbuf, &transport_writing->outbuf);
+ GPR_ASSERT (transport_global->qbuf.count == 0);
+
+ if (transport_global->dirtied_local_settings && !transport_global->sent_local_settings)
+ {
+ gpr_slice_buffer_add (&transport_writing->outbuf, grpc_chttp2_settings_create (transport_global->settings[GRPC_SENT_SETTINGS], transport_global->settings[GRPC_LOCAL_SETTINGS], transport_global->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
+ transport_global->force_send_settings = 0;
+ transport_global->dirtied_local_settings = 0;
+ transport_global->sent_local_settings = 1;
+ }
/* for each grpc_chttp2_stream that's become writable, frame it's data
(according to available window sizes) and add to the output buffer */
- while (grpc_chttp2_list_pop_writable_stream(
- transport_global, transport_writing, &stream_global, &stream_writing)) {
- if (stream_global == first_reinserted_stream) {
- /* prevent infinite loop */
- grpc_chttp2_list_add_first_writable_stream(transport_global,
- stream_global);
- break;
- }
-
- stream_writing->id = stream_global->id;
- stream_writing->send_closed = GRPC_DONT_SEND_CLOSED;
-
- if (stream_global->outgoing_sopb) {
- window_delta = grpc_chttp2_preencode(
- stream_global->outgoing_sopb->ops,
- &stream_global->outgoing_sopb->nops,
- (gpr_uint32)GPR_MIN(GPR_MIN(transport_global->outgoing_window,
- stream_global->outgoing_window),
- GPR_UINT32_MAX),
- &stream_writing->sopb);
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT(
- "write", transport_global, outgoing_window, -(gpr_int64)window_delta);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global,
- outgoing_window,
- -(gpr_int64)window_delta);
- transport_global->outgoing_window -= window_delta;
- stream_global->outgoing_window -= window_delta;
-
- if (stream_global->write_state == GRPC_WRITE_STATE_QUEUED_CLOSE &&
- stream_global->outgoing_sopb->nops == 0) {
- if (!transport_global->is_client && !stream_global->read_closed) {
- stream_writing->send_closed = GRPC_SEND_CLOSED_WITH_RST_STREAM;
- } else {
- stream_writing->send_closed = GRPC_SEND_CLOSED;
- }
- }
-
- if (stream_global->outgoing_window > 0 &&
- stream_global->outgoing_sopb->nops != 0) {
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
- if (first_reinserted_stream == NULL &&
- transport_global->outgoing_window == 0) {
- first_reinserted_stream = stream_global;
- }
- }
- }
-
- if (!stream_global->read_closed &&
- stream_global->unannounced_incoming_window > 0) {
- GPR_ASSERT(stream_writing->announce_window == 0);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "write", transport_writing, stream_writing, announce_window,
- stream_global->unannounced_incoming_window);
- stream_writing->announce_window =
- stream_global->unannounced_incoming_window;
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "write", transport_global, stream_global, incoming_window,
- stream_global->unannounced_incoming_window);
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "write", transport_global, stream_global, unannounced_incoming_window,
- -(gpr_int64)stream_global->unannounced_incoming_window);
- stream_global->incoming_window +=
- stream_global->unannounced_incoming_window;
- stream_global->unannounced_incoming_window = 0;
- grpc_chttp2_list_add_incoming_window_updated(transport_global,
- stream_global);
- stream_global->writing_now |= GRPC_CHTTP2_WRITING_WINDOW;
- }
- if (stream_writing->sopb.nops > 0 ||
- stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
- stream_global->writing_now |= GRPC_CHTTP2_WRITING_DATA;
+ while (grpc_chttp2_list_pop_writable_stream (transport_global, transport_writing, &stream_global, &stream_writing))
+ {
+ if (stream_global == first_reinserted_stream)
+ {
+ /* prevent infinite loop */
+ grpc_chttp2_list_add_first_writable_stream (transport_global, stream_global);
+ break;
+ }
+
+ stream_writing->id = stream_global->id;
+ stream_writing->send_closed = GRPC_DONT_SEND_CLOSED;
+
+ if (stream_global->outgoing_sopb)
+ {
+ window_delta = grpc_chttp2_preencode (stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops, (gpr_uint32) GPR_MIN (GPR_MIN (transport_global->outgoing_window, stream_global->outgoing_window), GPR_UINT32_MAX), &stream_writing->sopb);
+ GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT ("write", transport_global, outgoing_window, -(gpr_int64) window_delta);
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM ("write", transport_global, stream_global, outgoing_window, -(gpr_int64) window_delta);
+ transport_global->outgoing_window -= window_delta;
+ stream_global->outgoing_window -= window_delta;
+
+ if (stream_global->write_state == GRPC_WRITE_STATE_QUEUED_CLOSE && stream_global->outgoing_sopb->nops == 0)
+ {
+ if (!transport_global->is_client && !stream_global->read_closed)
+ {
+ stream_writing->send_closed = GRPC_SEND_CLOSED_WITH_RST_STREAM;
+ }
+ else
+ {
+ stream_writing->send_closed = GRPC_SEND_CLOSED;
+ }
+ }
+
+ if (stream_global->outgoing_window > 0 && stream_global->outgoing_sopb->nops != 0)
+ {
+ grpc_chttp2_list_add_writable_stream (transport_global, stream_global);
+ if (first_reinserted_stream == NULL && transport_global->outgoing_window == 0)
+ {
+ first_reinserted_stream = stream_global;
+ }
+ }
+ }
+
+ if (!stream_global->read_closed && stream_global->unannounced_incoming_window > 0)
+ {
+ GPR_ASSERT (stream_writing->announce_window == 0);
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM ("write", transport_writing, stream_writing, announce_window, stream_global->unannounced_incoming_window);
+ stream_writing->announce_window = stream_global->unannounced_incoming_window;
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM ("write", transport_global, stream_global, incoming_window, stream_global->unannounced_incoming_window);
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM ("write", transport_global, stream_global, unannounced_incoming_window, -(gpr_int64) stream_global->unannounced_incoming_window);
+ stream_global->incoming_window += stream_global->unannounced_incoming_window;
+ stream_global->unannounced_incoming_window = 0;
+ grpc_chttp2_list_add_incoming_window_updated (transport_global, stream_global);
+ stream_global->writing_now |= GRPC_CHTTP2_WRITING_WINDOW;
+ }
+ if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != GRPC_DONT_SEND_CLOSED)
+ {
+ stream_global->writing_now |= GRPC_CHTTP2_WRITING_DATA;
+ }
+ if (stream_global->writing_now != 0)
+ {
+ grpc_chttp2_list_add_writing_stream (transport_writing, stream_writing);
+ }
}
- if (stream_global->writing_now != 0) {
- grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
- }
- }
/* if the grpc_chttp2_transport is ready to send a window update, do so here
also; 3/4 is a magic number that will likely get tuned soon */
- if (transport_global->incoming_window <
- transport_global->connection_window_target * 3 / 4) {
- window_delta = transport_global->connection_window_target -
- transport_global->incoming_window;
- gpr_slice_buffer_add(&transport_writing->outbuf,
- grpc_chttp2_window_update_create(0, window_delta));
- GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT("write", transport_global,
- incoming_window, window_delta);
- transport_global->incoming_window += window_delta;
- }
-
- return transport_writing->outbuf.count > 0 ||
- grpc_chttp2_list_have_writing_streams(transport_writing);
+ if (transport_global->incoming_window < transport_global->connection_window_target * 3 / 4)
+ {
+ window_delta = transport_global->connection_window_target - transport_global->incoming_window;
+ gpr_slice_buffer_add (&transport_writing->outbuf, grpc_chttp2_window_update_create (0, window_delta));
+ GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT ("write", transport_global, incoming_window, window_delta);
+ transport_global->incoming_window += window_delta;
+ }
+
+ return transport_writing->outbuf.count > 0 || grpc_chttp2_list_have_writing_streams (transport_writing);
}
-void grpc_chttp2_perform_writes(
- grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint,
- grpc_closure_list *closure_list) {
- GPR_ASSERT(transport_writing->outbuf.count > 0 ||
- grpc_chttp2_list_have_writing_streams(transport_writing));
+void
+grpc_chttp2_perform_writes (grpc_chttp2_transport_writing * transport_writing, grpc_endpoint * endpoint, grpc_closure_list * closure_list)
+{
+ GPR_ASSERT (transport_writing->outbuf.count > 0 || grpc_chttp2_list_have_writing_streams (transport_writing));
- finalize_outbuf(transport_writing);
+ finalize_outbuf (transport_writing);
- GPR_ASSERT(transport_writing->outbuf.count > 0);
- GPR_ASSERT(endpoint);
+ GPR_ASSERT (transport_writing->outbuf.count > 0);
+ GPR_ASSERT (endpoint);
- grpc_endpoint_write(endpoint, &transport_writing->outbuf,
- &transport_writing->done_cb, closure_list);
+ grpc_endpoint_write (endpoint, &transport_writing->outbuf, &transport_writing->done_cb, closure_list);
}
-static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
+static void
+finalize_outbuf (grpc_chttp2_transport_writing * transport_writing)
+{
grpc_chttp2_stream_writing *stream_writing;
- while (
- grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) {
- if (stream_writing->sopb.nops > 0 ||
- stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
- grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops,
- stream_writing->send_closed != GRPC_DONT_SEND_CLOSED,
- stream_writing->id,
- &transport_writing->hpack_compressor,
- &transport_writing->outbuf);
- stream_writing->sopb.nops = 0;
+ while (grpc_chttp2_list_pop_writing_stream (transport_writing, &stream_writing))
+ {
+ if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != GRPC_DONT_SEND_CLOSED)
+ {
+ grpc_chttp2_encode (stream_writing->sopb.ops, stream_writing->sopb.nops, stream_writing->send_closed != GRPC_DONT_SEND_CLOSED, stream_writing->id, &transport_writing->hpack_compressor, &transport_writing->outbuf);
+ stream_writing->sopb.nops = 0;
+ }
+ if (stream_writing->announce_window > 0)
+ {
+ gpr_slice_buffer_add (&transport_writing->outbuf, grpc_chttp2_window_update_create (stream_writing->id, stream_writing->announce_window));
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM ("write", transport_writing, stream_writing, announce_window, -(gpr_int64) stream_writing->announce_window);
+ stream_writing->announce_window = 0;
+ }
+ if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM)
+ {
+ gpr_slice_buffer_add (&transport_writing->outbuf, grpc_chttp2_rst_stream_create (stream_writing->id, GRPC_CHTTP2_NO_ERROR));
+ }
+ grpc_chttp2_list_add_written_stream (transport_writing, stream_writing);
}
- if (stream_writing->announce_window > 0) {
- gpr_slice_buffer_add(
- &transport_writing->outbuf,
- grpc_chttp2_window_update_create(stream_writing->id,
- stream_writing->announce_window));
- GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
- "write", transport_writing, stream_writing, announce_window,
- -(gpr_int64)stream_writing->announce_window);
- stream_writing->announce_window = 0;
- }
- if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) {
- gpr_slice_buffer_add(&transport_writing->outbuf,
- grpc_chttp2_rst_stream_create(stream_writing->id,
- GRPC_CHTTP2_NO_ERROR));
- }
- grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
- }
}
-void grpc_chttp2_cleanup_writing(
- grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_transport_writing *transport_writing,
- grpc_closure_list *closure_list) {
+void
+grpc_chttp2_cleanup_writing (grpc_chttp2_transport_global * transport_global, grpc_chttp2_transport_writing * transport_writing, grpc_closure_list * closure_list)
+{
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *stream_global;
- while (grpc_chttp2_list_pop_written_stream(
- transport_global, transport_writing, &stream_global, &stream_writing)) {
- GPR_ASSERT(stream_global->writing_now != 0);
- if (stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
- stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
- if (!transport_global->is_client) {
- stream_global->read_closed = 1;
- }
- }
- if (stream_global->writing_now & GRPC_CHTTP2_WRITING_DATA) {
- if (stream_global->outgoing_sopb != NULL &&
- stream_global->outgoing_sopb->nops == 0) {
- GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE);
- stream_global->outgoing_sopb = NULL;
- grpc_closure_list_add(closure_list, stream_global->send_done_closure,
- 1);
- }
+ while (grpc_chttp2_list_pop_written_stream (transport_global, transport_writing, &stream_global, &stream_writing))
+ {
+ GPR_ASSERT (stream_global->writing_now != 0);
+ if (stream_writing->send_closed != GRPC_DONT_SEND_CLOSED)
+ {
+ stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
+ if (!transport_global->is_client)
+ {
+ stream_global->read_closed = 1;
+ }
+ }
+ if (stream_global->writing_now & GRPC_CHTTP2_WRITING_DATA)
+ {
+ if (stream_global->outgoing_sopb != NULL && stream_global->outgoing_sopb->nops == 0)
+ {
+ GPR_ASSERT (stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE);
+ stream_global->outgoing_sopb = NULL;
+ grpc_closure_list_add (closure_list, stream_global->send_done_closure, 1);
+ }
+ }
+ stream_global->writing_now = 0;
+ grpc_chttp2_list_add_read_write_state_changed (transport_global, stream_global);
}
- stream_global->writing_now = 0;
- grpc_chttp2_list_add_read_write_state_changed(transport_global,
- stream_global);
- }
- gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);
+ gpr_slice_buffer_reset_and_unref (&transport_writing->outbuf);
}