aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c33
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h6
2 files changed, 29 insertions, 10 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index eb9ffd6773..790a567e1a 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -63,7 +63,7 @@
#define DEFAULT_WINDOW 65535
#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
#define MAX_WINDOW 0x7fffffffu
-
+#define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
#define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024)
#define MAX_CLIENT_STREAM_ID 0x7fffffffu
@@ -272,6 +272,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
window -- this should by rights be 0 */
t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
t->sent_local_settings = 0;
+ t->write_buffer_size = DEFAULT_WINDOW;
if (is_client) {
grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
@@ -322,6 +323,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_hpack_compressor_set_max_usable_size(&t->hpack_compressor,
(uint32_t)value);
}
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
+ t->write_buffer_size = (uint32_t)grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ (grpc_integer_options){0, 0, MAX_WRITE_BUFFER_SIZE});
} else {
static const struct {
const char *channel_arg_name;
@@ -896,15 +902,22 @@ static bool contains_non_ok_status(grpc_metadata_batch *batch) {
return false;
}
+static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s) {
+ if (s->id != 0 && (!s->write_buffering ||
+ s->flow_controlled_buffer.length > t->write_buffer_size)) {
+ grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
+ }
+}
+
static void add_fetched_slice_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
s->fetched_send_message_length +=
(uint32_t)GRPC_SLICE_LENGTH(s->fetching_slice);
grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice);
- if (s->id != 0) {
- grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
- }
+ maybe_become_writable_due_to_send_msg(exec_ctx, t, s);
}
static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
@@ -1099,14 +1112,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
(int64_t)len;
s->complete_fetch_covered_by_poller = op->covered_by_poller;
if (flags & GRPC_WRITE_BUFFER_HINT) {
- /* allow up to 64kb to be buffered */
- /* TODO(ctiller): make this configurable */
- s->next_message_end_offset -= 65536;
+ s->next_message_end_offset -= t->write_buffer_size;
+ s->write_buffering = true;
+ } else {
+ s->write_buffering = false;
}
continue_fetching_send_locked(exec_ctx, t, s);
- if (s->id != 0) {
- grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
- }
+ maybe_become_writable_due_to_send_msg(exec_ctx, t, s);
}
}
@@ -1115,6 +1127,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
s->send_trailing_metadata = op->send_trailing_metadata;
+ s->write_buffering = false;
const size_t metadata_size =
grpc_metadata_batch_size(op->send_trailing_metadata);
const size_t metadata_peer_limit =
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 43fd10f0fd..ee5edc92df 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -249,6 +249,9 @@ struct grpc_chttp2_transport {
int64_t announce_incoming_window;
/** how much window would we like to have for incoming_window */
uint32_t connection_window_target;
+ /** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
+ */
+ uint32_t write_buffer_size;
/** have we seen a goaway */
uint8_t seen_goaway;
@@ -403,6 +406,9 @@ struct grpc_chttp2_stream {
/** Has this stream seen an error.
If true, then pending incoming frames can be thrown away. */
bool seen_error;
+ /** Are we buffering writes on this stream? If yes, we won't become writable
+ until there's enough queued up in the flow_controlled_buffer */
+ bool write_buffering;
/** the error that resulted in this stream being read-closed */
grpc_error *read_closed_error;