diff options
author | Mark D. Roth <roth@google.com> | 2017-02-06 12:11:16 -0800 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2017-02-06 12:11:16 -0800 |
commit | c6449d9f2b48b7b1259ac7307c6a2e4798d10695 (patch) | |
tree | b4dff4c179cdaea8af4bd906da732f66336c5599 /src/core/ext/transport/chttp2 | |
parent | ae89af9cb48a65c67f9d2008615fb7f4f7005c65 (diff) | |
parent | d6d2677637fee5d2988dae08cee1b909d822c650 (diff) |
Merge remote-tracking branch 'upstream/master' into canonicalize_server_uri
Diffstat (limited to 'src/core/ext/transport/chttp2')
9 files changed, 520 insertions, 229 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 15f486d676..fa18f5a725 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -124,6 +124,21 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); +static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, + grpc_error *error); +static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, + grpc_error *error); + +static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_error *error); +static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_ping_type ping_type, + grpc_closure *on_initiate, + grpc_closure *on_complete); + +#define DEFAULT_MIN_TIME_BETWEEN_PINGS_MS 0 +#define DEFAULT_MAX_PINGS_BETWEEN_DATA 3 + /******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -155,16 +170,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, grpc_combiner_destroy(exec_ctx, t->combiner); - /* callback remaining pings: they're not allowed to call into the transpot, - and maybe they hold resources that need to be freed */ - while (t->pings.next != &t->pings) { - grpc_chttp2_outstanding_ping *ping = t->pings.next; - grpc_closure_sched(exec_ctx, ping->on_recv, - GRPC_ERROR_CREATE("Transport closed")); - ping->next->prev = ping->prev; - ping->prev->next = ping->next; - gpr_free(ping); - } + cancel_pings(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed")); while (t->write_cb_pool) { grpc_chttp2_write_cb *next = t->write_cb_pool->next; @@ -172,6 +178,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, t->write_cb_pool = next; } + gpr_free(t->ping_acks); gpr_free(t->peer_string); gpr_free(t); } @@ -224,10 +231,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->is_client = is_client; t->outgoing_window = DEFAULT_WINDOW; t->incoming_window = DEFAULT_WINDOW; - t->stream_lookahead = DEFAULT_WINDOW; - t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET; - t->ping_counter = 1; - t->pings.next = t->pings.prev = &t->pings; t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->is_first_frame = true; grpc_connectivity_state_init( @@ -248,6 +251,22 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure_init(&t->destructive_reclaimer_locked, destructive_reclaimer_locked, t, grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->start_bdp_ping_locked, start_bdp_ping_locked, t, + grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, + grpc_combiner_scheduler(t->combiner, false)); + + grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string); + t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC); + grpc_pid_controller_init( + &t->pid_controller, + (grpc_pid_controller_args){.gain_p = 4, + .gain_i = 8, + .gain_d = 0, + .initial_control_value = log2(DEFAULT_WINDOW), + .min_control_value = -1, + .max_control_value = 22, + .integral_range = 10}); grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser); @@ -290,6 +309,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, DEFAULT_MAX_HEADER_LIST_SIZE); + t->ping_policy = (grpc_chttp2_repeated_ping_policy){ + .max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA, + .min_time_between_pings = + gpr_time_from_millis(DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, GPR_TIMESPAN), + }; + if (channel_args) { for (i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, @@ -307,14 +332,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES)) { - const grpc_integer_options options = {-1, 5, INT_MAX}; - const int value = - grpc_channel_arg_get_integer(&channel_args->args[i], options); - if (value >= 0) { - t->stream_lookahead = (uint32_t)value; - } - } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) { const grpc_integer_options options = {-1, 0, INT_MAX}; const int value = @@ -324,6 +341,19 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, (uint32_t)value); } } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) { + t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer( + &channel_args->args[i], + (grpc_integer_options){DEFAULT_MAX_PINGS_BETWEEN_DATA, 0, INT_MAX}); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_MIN_TIME_BETWEEN_PINGS_MS)) { + t->ping_policy.min_time_between_pings = gpr_time_from_millis( + grpc_channel_arg_get_integer( + &channel_args->args[i], + (grpc_integer_options){DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, 0, + INT_MAX}), + GPR_TIMESPAN); + } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) { t->write_buffer_size = (uint32_t)grpc_channel_arg_get_integer( &channel_args->args[i], @@ -334,24 +364,26 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_setting_id setting_id; grpc_integer_options integer_options; bool availability[2] /* server, client */; - } settings_map[] = { - {GRPC_ARG_MAX_CONCURRENT_STREAMS, - GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, - {-1, 0, INT_MAX}, - {true, false}}, - {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER, - GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, - {-1, 0, INT_MAX}, - {true, true}}, - {GRPC_ARG_MAX_METADATA_SIZE, - GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, - {-1, 0, INT_MAX}, - {true, true}}, - {GRPC_ARG_HTTP2_MAX_FRAME_SIZE, - GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, - {-1, 16384, 16777215}, - {true, true}}, - }; + } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS, + GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, + {-1, 0, INT32_MAX}, + {true, false}}, + {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER, + GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, + {-1, 0, INT32_MAX}, + {true, true}}, + {GRPC_ARG_MAX_METADATA_SIZE, + GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + {-1, 0, INT32_MAX}, + {true, true}}, + {GRPC_ARG_HTTP2_MAX_FRAME_SIZE, + GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, + {-1, 16384, 16777215}, + {true, true}}, + {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES, + GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, + {-1, 5, INT32_MAX}, + {true, true}}}; for (j = 0; j < (int)GPR_ARRAY_SIZE(settings_map); j++) { if (0 == strcmp(channel_args->args[i].key, settings_map[j].channel_arg_name)) { @@ -374,6 +406,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } + t->ping_state.pings_before_data_required = + t->ping_policy.max_pings_without_data; + grpc_chttp2_initiate_write(exec_ctx, t, false, "init"); post_benign_reclaimer(exec_ctx, t); } @@ -425,6 +460,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close"); } end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error)); + cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); } @@ -475,11 +511,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, if (server_data) { s->id = (uint32_t)(uintptr_t)server_data; - s->outgoing_window = t->settings[GRPC_PEER_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->incoming_window = s->max_recv_bytes = - t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; *t->accepting_stream = s; grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); post_destructive_reclaimer(exec_ctx, t); @@ -508,6 +539,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, } grpc_chttp2_list_remove_stalled_by_transport(t, s); + grpc_chttp2_list_remove_stalled_by_stream(t, s); for (int i = 0; i < STREAM_LIST_COUNT; i++) { if (s->included[i]) { @@ -647,13 +679,21 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, GPR_TIMER_END("grpc_chttp2_initiate_write", 0); } -void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, bool covered_by_poller, - const char *reason) { +void grpc_chttp2_become_writable( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, + grpc_chttp2_stream_write_type stream_write_type, const char *reason) { if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become"); - grpc_chttp2_initiate_write(exec_ctx, t, covered_by_poller, reason); + } + switch (stream_write_type) { + case GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK: + break; + case GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED: + grpc_chttp2_initiate_write(exec_ctx, t, true, reason); + break; + case GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED: + grpc_chttp2_initiate_write(exec_ctx, t, false, reason); + break; } } @@ -781,7 +821,6 @@ void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx, static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_chttp2_stream *s; - uint32_t stream_incoming_window; /* start streams where we have free grpc_chttp2_stream ids and free * concurrency */ while (t->next_stream_id <= MAX_CLIENT_STREAM_ID && @@ -804,15 +843,11 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, "no_more_stream_ids"); } - s->outgoing_window = t->settings[GRPC_PEER_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->incoming_window = stream_incoming_window = - t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->max_recv_bytes = GPR_MAX(stream_incoming_window, s->max_recv_bytes); grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); post_destructive_reclaimer(exec_ctx, t); - grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream"); + grpc_chttp2_become_writable(exec_ctx, t, s, + GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, + "new_stream"); } /* cancel out streams that will never be started */ while (t->next_stream_id >= MAX_CLIENT_STREAM_ID && @@ -907,7 +942,9 @@ static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) { if (s->id != 0 && (!s->write_buffering || s->flow_controlled_buffer.length > t->write_buffer_size)) { - grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); + grpc_chttp2_become_writable(exec_ctx, t, s, + GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, + "op.send_message"); } } @@ -1069,7 +1106,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } } else { GPR_ASSERT(s->id != 0); - grpc_chttp2_become_writable(exec_ctx, t, s, true, + grpc_chttp2_become_writable(exec_ctx, t, s, + GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, "op.send_initial_metadata"); } } else { @@ -1160,7 +1198,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } else if (s->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding bytes before going writable */ - grpc_chttp2_become_writable(exec_ctx, t, s, true, + grpc_chttp2_become_writable(exec_ctx, t, s, + GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, "op.send_trailing_metadata"); } } @@ -1179,8 +1218,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, s->recv_message = op->recv_message; if (s->id != 0 && (s->incoming_frames.head == NULL || s->incoming_frames.head->is_tail)) { - incoming_byte_stream_update_flow_control(exec_ctx, t, s, - t->stream_lookahead, 0); + incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0); } grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); } @@ -1224,43 +1262,46 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, GPR_TIMER_END("perform_stream_op", 0); } +static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_error *error) { + /* callback remaining pings: they're not allowed to call into the transpot, + and maybe they hold resources that need to be freed */ + for (size_t i = 0; i < GRPC_CHTTP2_PING_TYPE_COUNT; i++) { + grpc_chttp2_ping_queue *pq = &t->ping_queues[i]; + for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { + grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error)); + grpc_closure_list_sched(exec_ctx, &pq->lists[j]); + } + } + GRPC_ERROR_UNREF(error); +} + static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_closure *on_recv) { - grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p)); - p->next = &t->pings; - p->prev = p->next->prev; - p->prev->next = p->next->prev = p; - p->id[0] = (uint8_t)((t->ping_counter >> 56) & 0xff); - p->id[1] = (uint8_t)((t->ping_counter >> 48) & 0xff); - p->id[2] = (uint8_t)((t->ping_counter >> 40) & 0xff); - p->id[3] = (uint8_t)((t->ping_counter >> 32) & 0xff); - p->id[4] = (uint8_t)((t->ping_counter >> 24) & 0xff); - p->id[5] = (uint8_t)((t->ping_counter >> 16) & 0xff); - p->id[6] = (uint8_t)((t->ping_counter >> 8) & 0xff); - p->id[7] = (uint8_t)(t->ping_counter & 0xff); - t->ping_counter++; - p->on_recv = on_recv; - grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id)); - grpc_chttp2_initiate_write(exec_ctx, t, true, "send_ping"); + grpc_chttp2_ping_type ping_type, + grpc_closure *on_initiate, grpc_closure *on_ack) { + grpc_chttp2_ping_queue *pq = &t->ping_queues[ping_type]; + grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate, + GRPC_ERROR_NONE); + if (grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack, + GRPC_ERROR_NONE)) { + grpc_chttp2_initiate_write(exec_ctx, t, false, "send_ping"); + } } void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - const uint8_t *opaque_8bytes) { - grpc_chttp2_outstanding_ping *ping; - for (ping = t->pings.next; ping != &t->pings; ping = ping->next) { - if (0 == memcmp(opaque_8bytes, ping->id, 8)) { - grpc_closure_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE); - ping->next->prev = ping->prev; - ping->prev->next = ping->next; - gpr_free(ping); - return; - } + uint64_t id) { + grpc_chttp2_ping_queue *pq = + &t->ping_queues[id % GRPC_CHTTP2_PING_TYPE_COUNT]; + if (pq->inflight_id != id) { + char *from = grpc_endpoint_get_peer(t->ep); + gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64, from, id); + gpr_free(from); + return; + } + grpc_closure_list_sched(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); + if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { + grpc_chttp2_initiate_write(exec_ctx, t, false, "continue_pings"); } - char *msg = gpr_dump((const char *)opaque_8bytes, 8, GPR_DUMP_HEX); - char *from = grpc_endpoint_get_peer(t->ep); - gpr_log(GPR_DEBUG, "Unknown ping response from %s: %s", from, msg); - gpr_free(from); - gpr_free(msg); } static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -1308,7 +1349,8 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, } if (op->send_ping) { - send_ping_locked(exec_ctx, t, op->send_ping); + send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, NULL, + op->send_ping); } if (close_transport != GRPC_ERROR_NONE) { @@ -1733,34 +1775,28 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_ERROR_UNREF(error); } -/** update window from a settings change */ -typedef struct { - grpc_chttp2_transport *t; - grpc_exec_ctx *exec_ctx; -} update_global_window_args; +/******************************************************************************* + * INPUT PROCESSING - PARSING + */ -static void update_global_window(void *args, uint32_t id, void *stream) { - update_global_window_args *a = args; - grpc_chttp2_transport *t = a->t; - grpc_chttp2_stream *s = stream; - int was_zero; - int is_zero; - int64_t initial_window_update = t->initial_window_update; - - if (initial_window_update > 0) { - was_zero = s->outgoing_window <= 0; - GRPC_CHTTP2_FLOW_CREDIT_STREAM("settings", t, s, outgoing_window, - initial_window_update); - is_zero = s->outgoing_window <= 0; - - if (was_zero && !is_zero) { - grpc_chttp2_become_writable(a->exec_ctx, t, s, true, - "update_global_window"); - } +static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + double bdp_dbl) { + uint32_t bdp; + if (bdp_dbl <= 0) { + bdp = 0; + } else if (bdp_dbl > UINT32_MAX) { + bdp = UINT32_MAX; } else { - GRPC_CHTTP2_FLOW_DEBIT_STREAM("settings", t, s, outgoing_window, - -initial_window_update); + bdp = (uint32_t)(bdp_dbl); + } + int64_t delta = + (int64_t)bdp - + (int64_t)t->settings[GRPC_LOCAL_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + if (delta == 0 || (bdp != 0 && delta > -1024 && delta < 1024)) { + return; } + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, bdp); } /******************************************************************************* @@ -1802,6 +1838,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_BEGIN("reading_action_locked", 0); grpc_chttp2_transport *t = tp; + bool need_bdp_ping = false; GRPC_ERROR_REF(error); @@ -1819,9 +1856,14 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, GRPC_ERROR_NONE}; for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) { + if (grpc_bdp_estimator_add_incoming_bytes( + &t->bdp_estimator, + (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]))) { + need_bdp_ping = true; + } errors[1] = grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]); - }; + } if (errors[1] != GRPC_ERROR_NONE) { errors[2] = try_http_parsing(exec_ctx, t); GRPC_ERROR_UNREF(error); @@ -1835,21 +1877,16 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_BEGIN("post_parse_locked", 0); if (t->initial_window_update != 0) { - update_global_window_args args = {t, exec_ctx}; - grpc_chttp2_stream_map_for_each(&t->stream_map, update_global_window, - &args); + if (t->initial_window_update > 0) { + grpc_chttp2_stream *s; + while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { + grpc_chttp2_become_writable( + exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED, + "unstalled"); + } + } t->initial_window_update = 0; } - /* handle higher level things */ - if (t->incoming_window < t->connection_window_target * 3 / 4) { - int64_t announce_bytes = t->connection_window_target - t->incoming_window; - GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", t, announce_incoming_window, - announce_bytes); - GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", t, incoming_window, - announce_bytes); - grpc_chttp2_initiate_write(exec_ctx, t, false, "global incoming window"); - } - GPR_TIMER_END("post_parse_locked", 0); } @@ -1870,6 +1907,35 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, if (keep_reading) { grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->read_action_locked); + + if (need_bdp_ping) { + GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); + grpc_bdp_estimator_schedule_ping(&t->bdp_estimator); + send_ping_locked(exec_ctx, t, + GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE, + &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked); + } + + int64_t estimate = -1; + if (grpc_bdp_estimator_get_estimate(&t->bdp_estimator, &estimate)) { + double target = 1 + log2((double)estimate); + double memory_pressure = grpc_resource_quota_get_memory_pressure( + grpc_resource_user_quota(grpc_endpoint_get_resource_user(t->ep))); + if (memory_pressure > 0.8) { + target *= 1 - GPR_MIN(1, (memory_pressure - 0.8) / 0.1); + } + double bdp_error = target - grpc_pid_controller_last(&t->pid_controller); + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec dt_timespec = gpr_time_sub(now, t->last_pid_update); + double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9; + if (dt > 0.1) { + dt = 0.1; + } + double log2_bdp_guess = + grpc_pid_controller_update(&t->pid_controller, bdp_error, dt); + update_bdp(exec_ctx, t, pow(2, log2_bdp_guess)); + t->last_pid_update = now; + } GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); } else { GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action"); @@ -1882,6 +1948,26 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_END("reading_action_locked", 0); } +static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, + grpc_error *error) { + grpc_chttp2_transport *t = tp; + if (grpc_http_trace) { + gpr_log(GPR_DEBUG, "%s: Start BDP ping", t->peer_string); + } + grpc_bdp_estimator_start_ping(&t->bdp_estimator); +} + +static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, + grpc_error *error) { + grpc_chttp2_transport *t = tp; + if (grpc_http_trace) { + gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string); + } + grpc_bdp_estimator_complete_ping(&t->bdp_estimator); + + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); +} + /******************************************************************************* * CALLBACK LOOP */ @@ -1932,10 +2018,12 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, size_t max_size_hint, size_t have_already) { uint32_t max_recv_bytes; + uint32_t initial_window_size = + t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; /* clamp max recv hint to an allowable size */ - if (max_size_hint >= UINT32_MAX - t->stream_lookahead) { - max_recv_bytes = UINT32_MAX - t->stream_lookahead; + if (max_size_hint >= UINT32_MAX - initial_window_size) { + max_recv_bytes = UINT32_MAX - initial_window_size; } else { max_recv_bytes = (uint32_t)max_size_hint; } @@ -1948,20 +2036,26 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, } /* add some small lookahead to keep pipelines flowing */ - GPR_ASSERT(max_recv_bytes <= UINT32_MAX - t->stream_lookahead); - max_recv_bytes += t->stream_lookahead; - if (s->max_recv_bytes < max_recv_bytes) { - uint32_t add_max_recv_bytes = max_recv_bytes - s->max_recv_bytes; - bool new_window_write_is_covered_by_poller = - s->max_recv_bytes < have_already; - GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, max_recv_bytes, - add_max_recv_bytes); - GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, incoming_window, + GPR_ASSERT(max_recv_bytes <= UINT32_MAX - initial_window_size); + if (s->incoming_window_delta < max_recv_bytes && !s->read_closed) { + uint32_t add_max_recv_bytes = + (uint32_t)(max_recv_bytes - s->incoming_window_delta); + grpc_chttp2_stream_write_type write_type = + GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED; + if (s->incoming_window_delta + initial_window_size < + (int64_t)have_already) { + write_type = GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED; + } + GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, incoming_window_delta, add_max_recv_bytes); GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, announce_window, add_max_recv_bytes); - grpc_chttp2_become_writable(exec_ctx, t, s, - new_window_write_is_covered_by_poller, + if ((int64_t)s->incoming_window_delta + (int64_t)initial_window_size - + (int64_t)s->announce_window > + (int64_t)initial_window_size / 2) { + write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK; + } + grpc_chttp2_become_writable(exec_ctx, t, s, write_type, "read_incoming_stream"); } } diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c index 7de5f6362d..f487533c41 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.c +++ b/src/core/ext/transport/chttp2/transport/frame_ping.c @@ -40,7 +40,7 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> -grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes) { +grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint64_t opaque_8bytes) { grpc_slice slice = grpc_slice_malloc(9 + 8); uint8_t *p = GRPC_SLICE_START_PTR(slice); @@ -53,7 +53,14 @@ grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes) { *p++ = 0; *p++ = 0; *p++ = 0; - memcpy(p, opaque_8bytes, 8); + *p++ = (uint8_t)(opaque_8bytes >> 56); + *p++ = (uint8_t)(opaque_8bytes >> 48); + *p++ = (uint8_t)(opaque_8bytes >> 40); + *p++ = (uint8_t)(opaque_8bytes >> 32); + *p++ = (uint8_t)(opaque_8bytes >> 24); + *p++ = (uint8_t)(opaque_8bytes >> 16); + *p++ = (uint8_t)(opaque_8bytes >> 8); + *p++ = (uint8_t)(opaque_8bytes); return slice; } @@ -70,6 +77,7 @@ grpc_error *grpc_chttp2_ping_parser_begin_frame(grpc_chttp2_ping_parser *parser, } parser->byte = 0; parser->is_ack = flags; + parser->opaque_8bytes = 0; return GRPC_ERROR_NONE; } @@ -83,7 +91,7 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_ping_parser *p = parser; while (p->byte != 8 && cur != end) { - p->opaque_8bytes[p->byte] = *cur; + p->opaque_8bytes |= (((uint64_t)*cur) << (8 * p->byte)); cur++; p->byte++; } @@ -93,8 +101,12 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, if (p->is_ack) { grpc_chttp2_ack_ping(exec_ctx, t, p->opaque_8bytes); } else { - grpc_slice_buffer_add(&t->qbuf, - grpc_chttp2_ping_create(1, p->opaque_8bytes)); + if (t->ping_ack_count == t->ping_ack_capacity) { + t->ping_ack_capacity = GPR_MAX(t->ping_ack_capacity * 3 / 2, 3); + t->ping_acks = gpr_realloc( + t->ping_acks, t->ping_ack_capacity * sizeof(*t->ping_acks)); + } + t->ping_acks[t->ping_ack_count++] = p->opaque_8bytes; grpc_chttp2_initiate_write(exec_ctx, t, false, "ping response"); } } diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.h b/src/core/ext/transport/chttp2/transport/frame_ping.h index b9889e2d11..ef642465d7 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.h +++ b/src/core/ext/transport/chttp2/transport/frame_ping.h @@ -41,10 +41,10 @@ typedef struct { uint8_t byte; uint8_t is_ack; - uint8_t opaque_8bytes[8]; + uint64_t opaque_8bytes; } grpc_chttp2_ping_parser; -grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes); +grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint64_t opaque_8bytes); grpc_error *grpc_chttp2_ping_parser_begin_frame(grpc_chttp2_ping_parser *parser, uint32_t length, uint8_t flags); diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.c b/src/core/ext/transport/chttp2/transport/frame_settings.c index be9b663ac1..82290e34cd 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.c +++ b/src/core/ext/transport/chttp2/transport/frame_settings.c @@ -236,7 +236,7 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p, } if (parser->id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE && parser->incoming_settings[parser->id] != parser->value) { - t->initial_window_update = + t->initial_window_update += (int64_t)parser->value - parser->incoming_settings[parser->id]; if (grpc_http_trace) { gpr_log(GPR_DEBUG, "adding %d for initial_window change", @@ -245,8 +245,9 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p, } parser->incoming_settings[parser->id] = parser->value; if (grpc_http_trace) { - gpr_log(GPR_DEBUG, "CHTTP2:%s: got setting %d = %d", - t->is_client ? "CLI" : "SVR", parser->id, parser->value); + gpr_log(GPR_DEBUG, "CHTTP2:%s:%s: got setting %d = %d", + t->is_client ? "CLI" : "SVR", t->peer_string, parser->id, + parser->value); } } else if (grpc_http_trace) { gpr_log(GPR_ERROR, "CHTTP2: Ignoring unknown setting %d (value %d)", diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c index 31a31c2871..8fa0bb471a 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.c +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c @@ -110,13 +110,12 @@ grpc_error *grpc_chttp2_window_update_parser_parse( if (t->incoming_stream_id != 0) { if (s != NULL) { - bool was_zero = s->outgoing_window <= 0; - GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", t, s, outgoing_window, + GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", t, s, outgoing_window_delta, received_update); - bool is_zero = s->outgoing_window <= 0; - if (was_zero && !is_zero) { - grpc_chttp2_become_writable(exec_ctx, t, s, false, - "stream.read_flow_control"); + if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) { + grpc_chttp2_become_writable( + exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED, + "stream.read_flow_control"); } } } else { diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index ee5edc92df..1dabf9edba 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -50,7 +50,9 @@ #include "src/core/ext/transport/chttp2/transport/stream_map.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/transport/bdp_estimator.h" #include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/pid_controller.h" #include "src/core/lib/transport/transport_impl.h" /* streams are kept in various linked lists depending on what things need to @@ -59,6 +61,7 @@ typedef enum { GRPC_CHTTP2_LIST_WRITABLE, GRPC_CHTTP2_LIST_WRITING, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT, + GRPC_CHTTP2_LIST_STALLED_BY_STREAM, /** streams that are waiting to start because there are too many concurrent streams on the connection */ GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY, @@ -72,6 +75,34 @@ typedef enum { GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER, } grpc_chttp2_write_state; +typedef enum { + GRPC_CHTTP2_PING_ON_NEXT_WRITE = 0, + GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE, + GRPC_CHTTP2_PING_TYPE_COUNT /* must be last */ +} grpc_chttp2_ping_type; + +typedef enum { + GRPC_CHTTP2_PCL_INITIATE = 0, + GRPC_CHTTP2_PCL_NEXT, + GRPC_CHTTP2_PCL_INFLIGHT, + GRPC_CHTTP2_PCL_COUNT /* must be last */ +} grpc_chttp2_ping_closure_list; + +typedef struct { + grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT]; + uint64_t inflight_id; +} grpc_chttp2_ping_queue; + +typedef struct { + gpr_timespec min_time_between_pings; + int max_pings_without_data; +} grpc_chttp2_repeated_ping_policy; + +typedef struct { + gpr_timespec last_ping_sent_time; + int pings_before_data_required; +} grpc_chttp2_repeated_ping_state; + /* deframer state for the overall http2 stream of bytes */ typedef enum { /* prefix: one entry per http2 connection prefix byte */ @@ -144,14 +175,6 @@ typedef enum { GRPC_CHTTP2_GOAWAY_SENT, } grpc_chttp2_sent_goaway_state; -/* Outstanding ping request data */ -typedef struct grpc_chttp2_outstanding_ping { - uint8_t id[8]; - grpc_closure *on_recv; - struct grpc_chttp2_outstanding_ping *next; - struct grpc_chttp2_outstanding_ping *prev; -} grpc_chttp2_outstanding_ping; - typedef struct grpc_chttp2_write_cb { int64_t call_at_byte; grpc_closure *closure; @@ -271,16 +294,19 @@ struct grpc_chttp2_transport { copied to next_stream_id in parsing when parsing commences */ uint32_t next_stream_id; - /** how far to lookahead in a stream? */ - uint32_t stream_lookahead; - /** last new stream id */ uint32_t last_new_stream_id; - /** pings awaiting responses */ - grpc_chttp2_outstanding_ping pings; - /** next payload for an outgoing ping */ - uint64_t ping_counter; + /** ping queues for various ping insertion points */ + grpc_chttp2_ping_queue ping_queues[GRPC_CHTTP2_PING_TYPE_COUNT]; + grpc_chttp2_repeated_ping_policy ping_policy; + grpc_chttp2_repeated_ping_state ping_state; + uint64_t ping_ctr; /* unique id for pings */ + + /** ping acks */ + size_t ping_ack_count; + size_t ping_ack_capacity; + uint64_t *ping_acks; /** parser for headers */ grpc_chttp2_hpack_parser hpack_parser; @@ -324,6 +350,13 @@ struct grpc_chttp2_transport { grpc_chttp2_write_cb *write_cb_pool; + /* bdp estimator */ + grpc_bdp_estimator bdp_estimator; + grpc_pid_controller pid_controller; + grpc_closure start_bdp_ping_locked; + grpc_closure finish_bdp_ping_locked; + gpr_timespec last_pid_update; + /* if non-NULL, close the transport with this error when writes are finished */ grpc_error *close_transport_on_writes_finished; @@ -362,12 +395,10 @@ struct grpc_chttp2_stream { /** HTTP2 stream id for this stream, or zero if one has not been assigned */ uint32_t id; - /** window available for us to send to peer */ - int64_t outgoing_window; - /** The number of bytes the upper layers have offered to receive. - As the upper layer offers more bytes, this value increases. - As bytes are read, this value decreases. */ - uint32_t max_recv_bytes; + /** window available for us to send to peer, over or under the initial window + * size of the transport... ie: + * outgoing_window = outgoing_window_delta + transport.initial_window_size */ + int64_t outgoing_window_delta; /** things the upper layers would like to send */ grpc_metadata_batch *send_initial_metadata; grpc_closure *send_initial_metadata_finished; @@ -428,8 +459,10 @@ struct grpc_chttp2_stream { grpc_error *forced_close_error; /** how many header frames have we received? */ uint8_t header_frames_received; - /** window available for peer to send to us */ - int64_t incoming_window; + /** window available for peer to send to us (as a delta on + * transport.initial_window_size) + * incoming_window = incoming_window_delta + transport.initial_window_size */ + int64_t incoming_window_delta; /** parsing state for data frames */ grpc_chttp2_data_parser data_parser; /** number of bytes received - reset at end of parse thread execution */ @@ -478,36 +511,43 @@ bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s); /** Get a writable stream returns non-zero if there was a stream available */ -int grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t, - grpc_chttp2_stream **s); +bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t, + grpc_chttp2_stream **s); bool grpc_chttp2_list_remove_writable_stream( grpc_chttp2_transport *t, grpc_chttp2_stream *s) GRPC_MUST_USE_RESULT; bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s); -int grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t); -int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t, - grpc_chttp2_stream **s); +bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t); +bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t, + grpc_chttp2_stream **s); void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s); -int grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport *t, - grpc_chttp2_stream **s); +bool grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport *t, + grpc_chttp2_stream **s); void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport *t, grpc_chttp2_stream *s); -int grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t, - grpc_chttp2_stream **s); +bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t, + grpc_chttp2_stream **s); void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport *t, grpc_chttp2_stream *s); void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t, grpc_chttp2_stream *s); -int grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t, - grpc_chttp2_stream **s); +bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t, + grpc_chttp2_stream **s); void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport *t, grpc_chttp2_stream *s); +void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport *t, + grpc_chttp2_stream *s); +bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport *t, + grpc_chttp2_stream **s); +bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t, + grpc_chttp2_stream *s); + grpc_chttp2_stream *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport *t, uint32_t id); grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx, @@ -672,13 +712,23 @@ void grpc_chttp2_incoming_byte_stream_finished( grpc_error *error); void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - const uint8_t *opaque_8bytes); + uint64_t id); + +typedef enum { + /* don't initiate a transport write, but piggyback on the next one */ + GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK, + /* initiate a covered write */ + GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED, + /* initiate an uncovered write */ + GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED +} grpc_chttp2_stream_write_type; /** add a ref to the stream and add it to the writable list; ref will be dropped in writing.c */ void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_stream *s, bool covered_by_poller, + grpc_chttp2_stream *s, + grpc_chttp2_stream_write_type type, const char *reason); void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index f58cd696f9..24bd93067b 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -376,25 +376,45 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx, return err; } + uint32_t target_incoming_window = GPR_MAX( + t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + 1024); GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window, incoming_frame_size); + if (t->incoming_window <= target_incoming_window / 2) { + grpc_chttp2_initiate_write(exec_ctx, t, false, "flow_control"); + } if (s != NULL) { - if (incoming_frame_size > s->incoming_window) { + if (incoming_frame_size > + s->incoming_window_delta + + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) { char *msg; gpr_asprintf(&msg, "frame of size %d overflows incoming window of %" PRId64, - t->incoming_frame_size, s->incoming_window); + t->incoming_frame_size, + s->incoming_window_delta + + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); grpc_error *err = GRPC_ERROR_CREATE(msg); gpr_free(msg); return err; } - GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", t, s, incoming_window, + GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", t, s, incoming_window_delta, incoming_frame_size); + if ((int64_t)t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] + + (int64_t)s->incoming_window_delta - (int64_t)s->announce_window <= + (int64_t)t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] / + 2) { + grpc_chttp2_become_writable(exec_ctx, t, s, + GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED, + "window-update-required"); + } s->received_bytes += incoming_frame_size; - s->max_recv_bytes -= - (uint32_t)GPR_MIN(s->max_recv_bytes, incoming_frame_size); } return GRPC_ERROR_NONE; diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c index a60264cc51..078818fb18 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.c +++ b/src/core/ext/transport/chttp2/transport/stream_lists.c @@ -37,14 +37,14 @@ /* core list management */ -static int stream_list_empty(grpc_chttp2_transport *t, - grpc_chttp2_stream_list_id id) { +static bool stream_list_empty(grpc_chttp2_transport *t, + grpc_chttp2_stream_list_id id) { return t->lists[id].head == NULL; } -static int stream_list_pop(grpc_chttp2_transport *t, - grpc_chttp2_stream **stream, - grpc_chttp2_stream_list_id id) { +static bool stream_list_pop(grpc_chttp2_transport *t, + grpc_chttp2_stream **stream, + grpc_chttp2_stream_list_id id) { grpc_chttp2_stream *s = t->lists[id].head; if (s) { grpc_chttp2_stream *new_head = s->links[id].next; @@ -124,8 +124,8 @@ bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport *t, return stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITABLE); } -int grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t, - grpc_chttp2_stream **s) { +bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t, + grpc_chttp2_stream **s) { return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITABLE); } @@ -139,12 +139,12 @@ bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport *t, return stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITING); } -int grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t) { +bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t) { return !stream_list_empty(t, GRPC_CHTTP2_LIST_WRITING); } -int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t, - grpc_chttp2_stream **s) { +bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t, + grpc_chttp2_stream **s) { return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITING); } @@ -153,8 +153,8 @@ void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport *t, stream_list_add(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY); } -int grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t, - grpc_chttp2_stream **s) { +bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t, + grpc_chttp2_stream **s) { return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY); } @@ -168,8 +168,8 @@ void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t, stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); } -int grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t, - grpc_chttp2_stream **s) { +bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t, + grpc_chttp2_stream **s) { return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); } @@ -177,3 +177,18 @@ void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); } + +void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport *t, + grpc_chttp2_stream *s) { + stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM); +} + +bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport *t, + grpc_chttp2_stream **s) { + return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM); +} + +bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t, + grpc_chttp2_stream *s) { + return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM); +} diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 148e3844b5..05e6f59947 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -56,6 +56,75 @@ static void finish_write_cb(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->write_cb_pool = cb; } +static void collapse_pings_from_into(grpc_chttp2_transport *t, + grpc_chttp2_ping_type ping_type, + grpc_chttp2_ping_queue *pq) { + for (size_t i = 0; i < GRPC_CHTTP2_PCL_COUNT; i++) { + grpc_closure_list_move(&t->ping_queues[ping_type].lists[i], &pq->lists[i]); + } +} + +static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_ping_type ping_type) { + grpc_chttp2_ping_queue *pq = &t->ping_queues[ping_type]; + if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { + /* no ping needed: wait */ + return; + } + if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) { + /* ping already in-flight: wait */ + if (grpc_http_trace || grpc_bdp_estimator_trace) { + gpr_log(GPR_DEBUG, "Ping delayed [%p]: already pinging", t->peer_string); + } + return; + } + if (t->ping_state.pings_before_data_required == 0 && + t->ping_policy.max_pings_without_data != 0) { + /* need to send something of substance before sending a ping again */ + if (grpc_http_trace || grpc_bdp_estimator_trace) { + gpr_log(GPR_DEBUG, "Ping delayed [%p]: too many recent pings: %d/%d", + t->peer_string, t->ping_state.pings_before_data_required, + t->ping_policy.max_pings_without_data); + } + return; + } + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec elapsed = gpr_time_sub(now, t->ping_state.last_ping_sent_time); + /*gpr_log(GPR_DEBUG, "elapsed:%d.%09d min:%d.%09d", (int)elapsed.tv_sec, + elapsed.tv_nsec, (int)t->ping_policy.min_time_between_pings.tv_sec, + (int)t->ping_policy.min_time_between_pings.tv_nsec);*/ + if (gpr_time_cmp(elapsed, t->ping_policy.min_time_between_pings) < 0) { + /* not enough elapsed time between successive pings */ + if (grpc_http_trace || grpc_bdp_estimator_trace) { + gpr_log(GPR_DEBUG, + "Ping delayed [%p]: not enough time elapsed since last ping", + t->peer_string); + } + return; + } + /* coalesce equivalent pings into this one */ + switch (ping_type) { + case GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE: + collapse_pings_from_into(t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, pq); + break; + case GRPC_CHTTP2_PING_ON_NEXT_WRITE: + break; + case GRPC_CHTTP2_PING_TYPE_COUNT: + GPR_UNREACHABLE_CODE(break); + } + pq->inflight_id = t->ping_ctr * GRPC_CHTTP2_PING_TYPE_COUNT + ping_type; + t->ping_ctr++; + grpc_closure_list_sched(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INITIATE]); + grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT], + &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); + grpc_slice_buffer_add(&t->outbuf, + grpc_chttp2_ping_create(false, pq->inflight_id)); + t->ping_state.last_ping_sent_time = now; + t->ping_state.pings_before_data_required -= + (t->ping_state.pings_before_data_required != 0); +} + static void update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, int64_t send_bytes, grpc_chttp2_write_cb **list, grpc_error *error) { @@ -139,6 +208,8 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, s->sent_initial_metadata = true; sent_initial_metadata = true; now_writing = true; + t->ping_state.pings_before_data_required = + t->ping_policy.max_pings_without_data; } /* send any window updates */ if (s->announce_window > 0) { @@ -146,15 +217,22 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_add(&t->outbuf, grpc_chttp2_window_update_create( s->id, s->announce_window, &s->stats.outgoing)); + t->ping_state.pings_before_data_required = + t->ping_policy.max_pings_without_data; GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, announce_window, announce); } if (sent_initial_metadata) { /* send any body bytes, if allowed by flow control */ if (s->flow_controlled_buffer.length > 0) { - uint32_t max_outgoing = - (uint32_t)GPR_MIN(t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], - GPR_MIN(s->outgoing_window, t->outgoing_window)); + uint32_t stream_outgoing_window = (uint32_t)GPR_MAX( + 0, + s->outgoing_window_delta + + (int64_t)t->settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); + uint32_t max_outgoing = (uint32_t)GPR_MIN( + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], + GPR_MIN(stream_outgoing_window, t->outgoing_window)); if (max_outgoing > 0) { uint32_t send_bytes = (uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length); @@ -167,10 +245,12 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes, is_last_frame, &s->stats.outgoing, &t->outbuf); - GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window, + GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta, send_bytes); GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window, send_bytes); + t->ping_state.pings_before_data_required = + t->ping_policy.max_pings_without_data; if (is_last_frame) { s->send_trailing_metadata = NULL; s->sent_trailing_metadata = true; @@ -189,6 +269,9 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, } else if (t->outgoing_window == 0) { grpc_chttp2_list_add_stalled_by_transport(t, s); now_writing = true; + } else if (stream_outgoing_window == 0) { + grpc_chttp2_list_add_stalled_by_stream(t, s); + now_writing = true; } } if (s->send_trailing_metadata != NULL && @@ -227,15 +310,32 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, /* if the grpc_chttp2_transport is ready to send a window update, do so here also; 3/4 is a magic number that will likely get tuned soon */ - if (t->announce_incoming_window > 0) { - uint32_t announced = - (uint32_t)GPR_MIN(t->announce_incoming_window, UINT32_MAX); - GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, announce_incoming_window, - announced); + uint32_t target_incoming_window = GPR_MAX( + t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + 1024); + uint32_t threshold_to_send_transport_window_update = + t->outbuf.count > 0 ? 3 * target_incoming_window / 4 + : target_incoming_window / 2; + if (t->incoming_window <= threshold_to_send_transport_window_update) { + maybe_initiate_ping(exec_ctx, t, + GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE); + uint32_t announced = (uint32_t)GPR_CLAMP( + target_incoming_window - t->incoming_window, 0, UINT32_MAX); + GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("write", t, incoming_window, announced); grpc_transport_one_way_stats throwaway_stats; grpc_slice_buffer_add(&t->outbuf, grpc_chttp2_window_update_create( 0, announced, &throwaway_stats)); + t->ping_state.pings_before_data_required = + t->ping_policy.max_pings_without_data; + } + + for (size_t i = 0; i < t->ping_ack_count; i++) { + grpc_slice_buffer_add(&t->outbuf, + grpc_chttp2_ping_create(1, t->ping_acks[i])); } + t->ping_ack_count = 0; + + maybe_initiate_ping(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE); GPR_TIMER_END("grpc_chttp2_begin_write", 0); |