diff options
Diffstat (limited to 'src/core/ext/transport/chttp2/transport/chttp2_transport.c')
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 390 |
1 files changed, 242 insertions, 148 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 15f486d676..fa18f5a725 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -124,6 +124,21 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); +static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, + grpc_error *error); +static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, + grpc_error *error); + +static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_error *error); +static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_ping_type ping_type, + grpc_closure *on_initiate, + grpc_closure *on_complete); + +#define DEFAULT_MIN_TIME_BETWEEN_PINGS_MS 0 +#define DEFAULT_MAX_PINGS_BETWEEN_DATA 3 + /******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -155,16 +170,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, grpc_combiner_destroy(exec_ctx, t->combiner); - /* callback remaining pings: they're not allowed to call into the transpot, - and maybe they hold resources that need to be freed */ - while (t->pings.next != &t->pings) { - grpc_chttp2_outstanding_ping *ping = t->pings.next; - grpc_closure_sched(exec_ctx, ping->on_recv, - GRPC_ERROR_CREATE("Transport closed")); - ping->next->prev = ping->prev; - ping->prev->next = ping->next; - gpr_free(ping); - } + cancel_pings(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed")); while (t->write_cb_pool) { grpc_chttp2_write_cb *next = t->write_cb_pool->next; @@ -172,6 +178,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, t->write_cb_pool = next; } + gpr_free(t->ping_acks); gpr_free(t->peer_string); gpr_free(t); } @@ -224,10 +231,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->is_client = is_client; t->outgoing_window = DEFAULT_WINDOW; t->incoming_window = DEFAULT_WINDOW; - t->stream_lookahead = DEFAULT_WINDOW; - t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET; - t->ping_counter = 1; - t->pings.next = t->pings.prev = &t->pings; t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->is_first_frame = true; grpc_connectivity_state_init( @@ -248,6 +251,22 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure_init(&t->destructive_reclaimer_locked, destructive_reclaimer_locked, t, grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->start_bdp_ping_locked, start_bdp_ping_locked, t, + grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, + grpc_combiner_scheduler(t->combiner, false)); + + grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string); + t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC); + grpc_pid_controller_init( + &t->pid_controller, + (grpc_pid_controller_args){.gain_p = 4, + .gain_i = 8, + .gain_d = 0, + .initial_control_value = log2(DEFAULT_WINDOW), + .min_control_value = -1, + .max_control_value = 22, + .integral_range = 10}); grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser); @@ -290,6 +309,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, DEFAULT_MAX_HEADER_LIST_SIZE); + t->ping_policy = (grpc_chttp2_repeated_ping_policy){ + .max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA, + .min_time_between_pings = + gpr_time_from_millis(DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, GPR_TIMESPAN), + }; + if (channel_args) { for (i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, @@ -307,14 +332,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES)) { - const grpc_integer_options options = {-1, 5, INT_MAX}; - const int value = - grpc_channel_arg_get_integer(&channel_args->args[i], options); - if (value >= 0) { - t->stream_lookahead = (uint32_t)value; - } - } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) { const grpc_integer_options options = {-1, 0, INT_MAX}; const int value = @@ -324,6 +341,19 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, (uint32_t)value); } } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) { + t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer( + &channel_args->args[i], + (grpc_integer_options){DEFAULT_MAX_PINGS_BETWEEN_DATA, 0, INT_MAX}); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MIN_TIME_BETWEEN_PINGS_MS)) { + t->ping_policy.min_time_between_pings = gpr_time_from_millis( + grpc_channel_arg_get_integer( + &channel_args->args[i], + (grpc_integer_options){DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, 0, + INT_MAX}), + GPR_TIMESPAN); + } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) { t->write_buffer_size = (uint32_t)grpc_channel_arg_get_integer( &channel_args->args[i], @@ -334,24 +364,26 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_setting_id setting_id; grpc_integer_options integer_options; bool availability[2] /* server, client */; - } settings_map[] = { - {GRPC_ARG_MAX_CONCURRENT_STREAMS, - GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, - {-1, 0, INT_MAX}, - {true, false}}, - {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER, - GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, - {-1, 0, INT_MAX}, - {true, true}}, - {GRPC_ARG_MAX_METADATA_SIZE, - GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, - {-1, 0, INT_MAX}, - {true, true}}, - {GRPC_ARG_HTTP2_MAX_FRAME_SIZE, - GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, - {-1, 16384, 16777215}, - {true, true}}, - }; + } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS, + GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, + {-1, 0, INT32_MAX}, + {true, false}}, + {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER, + GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, + {-1, 0, INT32_MAX}, + {true, true}}, + {GRPC_ARG_MAX_METADATA_SIZE, + GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + {-1, 0, INT32_MAX}, + {true, true}}, + {GRPC_ARG_HTTP2_MAX_FRAME_SIZE, + GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, + {-1, 16384, 16777215}, + {true, true}}, + {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES, + GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, + {-1, 5, INT32_MAX}, + {true, true}}}; for (j = 0; j < (int)GPR_ARRAY_SIZE(settings_map); j++) { if (0 == strcmp(channel_args->args[i].key, settings_map[j].channel_arg_name)) { @@ -374,6 +406,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } + t->ping_state.pings_before_data_required = + t->ping_policy.max_pings_without_data; + grpc_chttp2_initiate_write(exec_ctx, t, false, "init"); post_benign_reclaimer(exec_ctx, t); } @@ -425,6 +460,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close"); } end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error)); + cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); } @@ -475,11 +511,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, if (server_data) { s->id = (uint32_t)(uintptr_t)server_data; - s->outgoing_window = t->settings[GRPC_PEER_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->incoming_window = s->max_recv_bytes = - t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; *t->accepting_stream = s; grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); post_destructive_reclaimer(exec_ctx, t); @@ -508,6 +539,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, } grpc_chttp2_list_remove_stalled_by_transport(t, s); + grpc_chttp2_list_remove_stalled_by_stream(t, s); for (int i = 0; i < STREAM_LIST_COUNT; i++) { if (s->included[i]) { @@ -647,13 +679,21 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, GPR_TIMER_END("grpc_chttp2_initiate_write", 0); } -void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, bool covered_by_poller, - const char *reason) { +void grpc_chttp2_become_writable( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, + grpc_chttp2_stream_write_type stream_write_type, const char *reason) { if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become"); - grpc_chttp2_initiate_write(exec_ctx, t, covered_by_poller, reason); + } + switch (stream_write_type) { + case GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK: + break; + case GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED: + grpc_chttp2_initiate_write(exec_ctx, t, true, reason); + break; + case GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED: + grpc_chttp2_initiate_write(exec_ctx, t, false, reason); + break; } } @@ -781,7 +821,6 @@ void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx, static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_chttp2_stream *s; - uint32_t stream_incoming_window; /* start streams where we have free grpc_chttp2_stream ids and free * concurrency */ while (t->next_stream_id <= MAX_CLIENT_STREAM_ID && @@ -804,15 +843,11 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, "no_more_stream_ids"); } - s->outgoing_window = t->settings[GRPC_PEER_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->incoming_window = stream_incoming_window = - t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->max_recv_bytes = GPR_MAX(stream_incoming_window, s->max_recv_bytes); grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); post_destructive_reclaimer(exec_ctx, t); - grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream"); + grpc_chttp2_become_writable(exec_ctx, t, s, + GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, + "new_stream"); } /* cancel out streams that will never be started */ while (t->next_stream_id >= MAX_CLIENT_STREAM_ID && @@ -907,7 +942,9 @@ static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) { if (s->id != 0 && (!s->write_buffering || s->flow_controlled_buffer.length > t->write_buffer_size)) { - grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); + grpc_chttp2_become_writable(exec_ctx, t, s, + GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, + "op.send_message"); } } @@ -1069,7 +1106,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } } else { GPR_ASSERT(s->id != 0); - grpc_chttp2_become_writable(exec_ctx, t, s, true, + grpc_chttp2_become_writable(exec_ctx, t, s, + GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, "op.send_initial_metadata"); } } else { @@ -1160,7 +1198,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } else if (s->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding bytes before going writable */ - grpc_chttp2_become_writable(exec_ctx, t, s, true, + grpc_chttp2_become_writable(exec_ctx, t, s, + GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, "op.send_trailing_metadata"); } } @@ -1179,8 +1218,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, s->recv_message = op->recv_message; if (s->id != 0 && (s->incoming_frames.head == NULL || s->incoming_frames.head->is_tail)) { - incoming_byte_stream_update_flow_control(exec_ctx, t, s, - t->stream_lookahead, 0); + incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0); } grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); } @@ -1224,43 +1262,46 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, GPR_TIMER_END("perform_stream_op", 0); } +static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_error *error) { + /* callback remaining pings: they're not allowed to call into the transpot, + and maybe they hold resources that need to be freed */ + for (size_t i = 0; i < GRPC_CHTTP2_PING_TYPE_COUNT; i++) { + grpc_chttp2_ping_queue *pq = &t->ping_queues[i]; + for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { + grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error)); + grpc_closure_list_sched(exec_ctx, &pq->lists[j]); + } + } + GRPC_ERROR_UNREF(error); +} + static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_closure *on_recv) { - grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p)); - p->next = &t->pings; - p->prev = p->next->prev; - p->prev->next = p->next->prev = p; - p->id[0] = (uint8_t)((t->ping_counter >> 56) & 0xff); - p->id[1] = (uint8_t)((t->ping_counter >> 48) & 0xff); - p->id[2] = (uint8_t)((t->ping_counter >> 40) & 0xff); - p->id[3] = (uint8_t)((t->ping_counter >> 32) & 0xff); - p->id[4] = (uint8_t)((t->ping_counter >> 24) & 0xff); - p->id[5] = (uint8_t)((t->ping_counter >> 16) & 0xff); - p->id[6] = (uint8_t)((t->ping_counter >> 8) & 0xff); - p->id[7] = (uint8_t)(t->ping_counter & 0xff); - t->ping_counter++; - p->on_recv = on_recv; - grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id)); - grpc_chttp2_initiate_write(exec_ctx, t, true, "send_ping"); + grpc_chttp2_ping_type ping_type, + grpc_closure *on_initiate, grpc_closure *on_ack) { + grpc_chttp2_ping_queue *pq = &t->ping_queues[ping_type]; + grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate, + GRPC_ERROR_NONE); + if (grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack, + GRPC_ERROR_NONE)) { + grpc_chttp2_initiate_write(exec_ctx, t, false, "send_ping"); + } } void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - const uint8_t *opaque_8bytes) { - grpc_chttp2_outstanding_ping *ping; - for (ping = t->pings.next; ping != &t->pings; ping = ping->next) { - if (0 == memcmp(opaque_8bytes, ping->id, 8)) { - grpc_closure_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE); - ping->next->prev = ping->prev; - ping->prev->next = ping->next; - gpr_free(ping); - return; - } + uint64_t id) { + grpc_chttp2_ping_queue *pq = + &t->ping_queues[id % GRPC_CHTTP2_PING_TYPE_COUNT]; + if (pq->inflight_id != id) { + char *from = grpc_endpoint_get_peer(t->ep); + gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64, from, id); + gpr_free(from); + return; + } + grpc_closure_list_sched(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); + if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { + grpc_chttp2_initiate_write(exec_ctx, t, false, "continue_pings"); } - char *msg = gpr_dump((const char *)opaque_8bytes, 8, GPR_DUMP_HEX); - char *from = grpc_endpoint_get_peer(t->ep); - gpr_log(GPR_DEBUG, "Unknown ping response from %s: %s", from, msg); - gpr_free(from); - gpr_free(msg); } static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -1308,7 +1349,8 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, } if (op->send_ping) { - send_ping_locked(exec_ctx, t, op->send_ping); + send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, NULL, + op->send_ping); } if (close_transport != GRPC_ERROR_NONE) { @@ -1733,34 +1775,28 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_ERROR_UNREF(error); } -/** update window from a settings change */ -typedef struct { - grpc_chttp2_transport *t; - grpc_exec_ctx *exec_ctx; -} update_global_window_args; +/******************************************************************************* + * INPUT PROCESSING - PARSING + */ -static void update_global_window(void *args, uint32_t id, void *stream) { - update_global_window_args *a = args; - grpc_chttp2_transport *t = a->t; - grpc_chttp2_stream *s = stream; - int was_zero; - int is_zero; - int64_t initial_window_update = t->initial_window_update; - - if (initial_window_update > 0) { - was_zero = s->outgoing_window <= 0; - GRPC_CHTTP2_FLOW_CREDIT_STREAM("settings", t, s, outgoing_window, - initial_window_update); - is_zero = s->outgoing_window <= 0; - - if (was_zero && !is_zero) { - grpc_chttp2_become_writable(a->exec_ctx, t, s, true, - "update_global_window"); - } +static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + double bdp_dbl) { + uint32_t bdp; + if (bdp_dbl <= 0) { + bdp = 0; + } else if (bdp_dbl > UINT32_MAX) { + bdp = UINT32_MAX; } else { - GRPC_CHTTP2_FLOW_DEBIT_STREAM("settings", t, s, outgoing_window, - -initial_window_update); + bdp = (uint32_t)(bdp_dbl); + } + int64_t delta = + (int64_t)bdp - + (int64_t)t->settings[GRPC_LOCAL_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + if (delta == 0 || (bdp != 0 && delta > -1024 && delta < 1024)) { + return; } + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, bdp); } /******************************************************************************* @@ -1802,6 +1838,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_BEGIN("reading_action_locked", 0); grpc_chttp2_transport *t = tp; + bool need_bdp_ping = false; GRPC_ERROR_REF(error); @@ -1819,9 +1856,14 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, GRPC_ERROR_NONE}; for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) { + if (grpc_bdp_estimator_add_incoming_bytes( + &t->bdp_estimator, + (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]))) { + need_bdp_ping = true; + } errors[1] = grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]); - }; + } if (errors[1] != GRPC_ERROR_NONE) { errors[2] = try_http_parsing(exec_ctx, t); GRPC_ERROR_UNREF(error); @@ -1835,21 +1877,16 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_BEGIN("post_parse_locked", 0); if (t->initial_window_update != 0) { - update_global_window_args args = {t, exec_ctx}; - grpc_chttp2_stream_map_for_each(&t->stream_map, update_global_window, - &args); + if (t->initial_window_update > 0) { + grpc_chttp2_stream *s; + while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { + grpc_chttp2_become_writable( + exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED, + "unstalled"); + } + } t->initial_window_update = 0; } - /* handle higher level things */ - if (t->incoming_window < t->connection_window_target * 3 / 4) { - int64_t announce_bytes = t->connection_window_target - t->incoming_window; - GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", t, announce_incoming_window, - announce_bytes); - GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", t, incoming_window, - announce_bytes); - grpc_chttp2_initiate_write(exec_ctx, t, false, "global incoming window"); - } - GPR_TIMER_END("post_parse_locked", 0); } @@ -1870,6 +1907,35 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, if (keep_reading) { grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->read_action_locked); + + if (need_bdp_ping) { + GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); + grpc_bdp_estimator_schedule_ping(&t->bdp_estimator); + send_ping_locked(exec_ctx, t, + GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE, + &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked); + } + + int64_t estimate = -1; + if (grpc_bdp_estimator_get_estimate(&t->bdp_estimator, &estimate)) { + double target = 1 + log2((double)estimate); + double memory_pressure = grpc_resource_quota_get_memory_pressure( + grpc_resource_user_quota(grpc_endpoint_get_resource_user(t->ep))); + if (memory_pressure > 0.8) { + target *= 1 - GPR_MIN(1, (memory_pressure - 0.8) / 0.1); + } + double bdp_error = target - grpc_pid_controller_last(&t->pid_controller); + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec dt_timespec = gpr_time_sub(now, t->last_pid_update); + double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9; + if (dt > 0.1) { + dt = 0.1; + } + double log2_bdp_guess = + grpc_pid_controller_update(&t->pid_controller, bdp_error, dt); + update_bdp(exec_ctx, t, pow(2, log2_bdp_guess)); + t->last_pid_update = now; + } GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); } else { GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action"); @@ -1882,6 +1948,26 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_END("reading_action_locked", 0); } +static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, + grpc_error *error) { + grpc_chttp2_transport *t = tp; + if (grpc_http_trace) { + gpr_log(GPR_DEBUG, "%s: Start BDP ping", t->peer_string); + } + grpc_bdp_estimator_start_ping(&t->bdp_estimator); +} + +static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, + grpc_error *error) { + grpc_chttp2_transport *t = tp; + if (grpc_http_trace) { + gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string); + } + grpc_bdp_estimator_complete_ping(&t->bdp_estimator); + + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); +} + /******************************************************************************* * CALLBACK LOOP */ @@ -1932,10 +2018,12 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, size_t max_size_hint, size_t have_already) { uint32_t max_recv_bytes; + uint32_t initial_window_size = + t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; /* clamp max recv hint to an allowable size */ - if (max_size_hint >= UINT32_MAX - t->stream_lookahead) { - max_recv_bytes = UINT32_MAX - t->stream_lookahead; + if (max_size_hint >= UINT32_MAX - initial_window_size) { + max_recv_bytes = UINT32_MAX - initial_window_size; } else { max_recv_bytes = (uint32_t)max_size_hint; } @@ -1948,20 +2036,26 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, } /* add some small lookahead to keep pipelines flowing */ - GPR_ASSERT(max_recv_bytes <= UINT32_MAX - t->stream_lookahead); - max_recv_bytes += t->stream_lookahead; - if (s->max_recv_bytes < max_recv_bytes) { - uint32_t add_max_recv_bytes = max_recv_bytes - s->max_recv_bytes; - bool new_window_write_is_covered_by_poller = - s->max_recv_bytes < have_already; - GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, max_recv_bytes, - add_max_recv_bytes); - GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, incoming_window, + GPR_ASSERT(max_recv_bytes <= UINT32_MAX - initial_window_size); + if (s->incoming_window_delta < max_recv_bytes && !s->read_closed) { + uint32_t add_max_recv_bytes = + (uint32_t)(max_recv_bytes - s->incoming_window_delta); + grpc_chttp2_stream_write_type write_type = + GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED; + if (s->incoming_window_delta + initial_window_size < + (int64_t)have_already) { + write_type = GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED; + } + GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, incoming_window_delta, add_max_recv_bytes); GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, announce_window, add_max_recv_bytes); - grpc_chttp2_become_writable(exec_ctx, t, s, - new_window_write_is_covered_by_poller, + if ((int64_t)s->incoming_window_delta + (int64_t)initial_window_size - + (int64_t)s->announce_window > + (int64_t)initial_window_size / 2) { + write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK; + } + grpc_chttp2_become_writable(exec_ctx, t, s, write_type, "read_incoming_stream"); } } |