diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 33 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/internal.h | 6 |
2 files changed, 29 insertions, 10 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index eb9ffd6773..790a567e1a 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -63,7 +63,7 @@ #define DEFAULT_WINDOW 65535 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024) #define MAX_WINDOW 0x7fffffffu - +#define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024) #define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024) #define MAX_CLIENT_STREAM_ID 0x7fffffffu @@ -272,6 +272,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, window -- this should by rights be 0 */ t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; t->sent_local_settings = 0; + t->write_buffer_size = DEFAULT_WINDOW; if (is_client) { grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string( @@ -322,6 +323,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_hpack_compressor_set_max_usable_size(&t->hpack_compressor, (uint32_t)value); } + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) { + t->write_buffer_size = (uint32_t)grpc_channel_arg_get_integer( + &channel_args->args[i], + (grpc_integer_options){0, 0, MAX_WRITE_BUFFER_SIZE}); } else { static const struct { const char *channel_arg_name; @@ -896,15 +902,22 @@ static bool contains_non_ok_status(grpc_metadata_batch *batch) { return false; } +static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s) { + if (s->id != 0 && (!s->write_buffering || + s->flow_controlled_buffer.length > t->write_buffer_size)) { + grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); + } +} + static void add_fetched_slice_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s) { s->fetched_send_message_length += (uint32_t)GRPC_SLICE_LENGTH(s->fetching_slice); grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice); - if (s->id != 0) { - grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); - } + maybe_become_writable_due_to_send_msg(exec_ctx, t, s); } static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, @@ -1099,14 +1112,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, (int64_t)len; s->complete_fetch_covered_by_poller = op->covered_by_poller; if (flags & GRPC_WRITE_BUFFER_HINT) { - /* allow up to 64kb to be buffered */ - /* TODO(ctiller): make this configurable */ - s->next_message_end_offset -= 65536; + s->next_message_end_offset -= t->write_buffer_size; + s->write_buffering = true; + } else { + s->write_buffering = false; } continue_fetching_send_locked(exec_ctx, t, s); - if (s->id != 0) { - grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); - } + maybe_become_writable_due_to_send_msg(exec_ctx, t, s); } } @@ -1115,6 +1127,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; s->send_trailing_metadata_finished = add_closure_barrier(on_complete); s->send_trailing_metadata = op->send_trailing_metadata; + s->write_buffering = false; const size_t metadata_size = grpc_metadata_batch_size(op->send_trailing_metadata); const size_t metadata_peer_limit = diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 43fd10f0fd..ee5edc92df 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -249,6 +249,9 @@ struct grpc_chttp2_transport { int64_t announce_incoming_window; /** how much window would we like to have for incoming_window */ uint32_t connection_window_target; + /** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set? + */ + uint32_t write_buffer_size; /** have we seen a goaway */ uint8_t seen_goaway; @@ -403,6 +406,9 @@ struct grpc_chttp2_stream { /** Has this stream seen an error. If true, then pending incoming frames can be thrown away. */ bool seen_error; + /** Are we buffering writes on this stream? If yes, we won't become writable + until there's enough queued up in the flow_controlled_buffer */ + bool write_buffering; /** the error that resulted in this stream being read-closed */ grpc_error *read_closed_error; |