diff options
Diffstat (limited to 'src/core/ext/transport/chttp2/transport/chttp2_transport.cc')
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.cc | 256 |
1 files changed, 148 insertions, 108 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index e4b19a2c4a..02fc53122d 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -54,7 +54,6 @@ #include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport_impl.h" -#define DEFAULT_WINDOW 65535 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024) #define MAX_WINDOW 0x7fffffffu #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024) @@ -152,10 +151,14 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); +static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t); static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error); static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error); +static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx, + void *tp, grpc_error *error); static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); @@ -218,6 +221,9 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, t->write_cb_pool = next; } + t->flow_control.Destroy(); + + GRPC_ERROR_UNREF(t->closed_with_error); gpr_free(t->ping_acks); gpr_free(t->peer_string); gpr_free(t); @@ -275,10 +281,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->endpoint_reading = 1; t->next_stream_id = is_client ? 1 : 2; t->is_client = is_client; - t->flow_control.remote_window = DEFAULT_WINDOW; - t->flow_control.announced_window = DEFAULT_WINDOW; - t->flow_control.target_initial_window_size = DEFAULT_WINDOW; - t->flow_control.t = t; t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->is_first_frame = true; grpc_connectivity_state_init( @@ -303,6 +305,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, grpc_combiner_scheduler(t->combiner)); + GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked, + next_bdp_ping_timer_expired_locked, t, + grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked, t, grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, @@ -315,8 +320,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, keepalive_watchdog_fired_locked, t, grpc_combiner_scheduler(t->combiner)); - grpc_bdp_estimator_init(&t->flow_control.bdp_estimator, t->peer_string); - grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser); @@ -340,8 +343,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, window -- this should by rights be 0 */ t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; t->sent_local_settings = 0; - t->write_buffer_size = DEFAULT_WINDOW; - t->flow_control.enable_bdp_probe = true; + t->write_buffer_size = grpc_core::chttp2::kDefaultWindow; if (is_client) { grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string( @@ -386,6 +388,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + bool enable_bdp = true; + if (channel_args) { for (i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, @@ -446,8 +450,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE}); } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) { - t->flow_control.enable_bdp_probe = - grpc_channel_arg_get_integer(&channel_args->args[i], {1, 0, 1}); + enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true); } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) { const int value = grpc_channel_arg_get_integer( @@ -542,6 +545,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } + t->flow_control.Init(exec_ctx, t, enable_bdp); + /* No pings allowed before receiving a header or data frame. */ t->ping_state.pings_before_data_required = 0; t->ping_state.is_delayed_ping_timer_set = false; @@ -562,10 +567,13 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; } - grpc_chttp2_act_on_flowctl_action( - exec_ctx, - grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t, - NULL); + if (enable_bdp) { + GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); + schedule_bdp_ping_locked(exec_ctx, t); + + grpc_chttp2_act_on_flowctl_action( + exec_ctx, t->flow_control->PeriodicUpdate(exec_ctx), t, NULL); + } grpc_chttp2_initiate_write(exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE); @@ -595,7 +603,9 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { - if (!t->closed) { + end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error)); + cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error)); + if (t->closed_with_error == GRPC_ERROR_NONE) { if (!grpc_error_has_clear_grpc_status(error)) { error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); @@ -610,13 +620,16 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_error_add_child(t->close_transport_on_writes_finished, error); return; } - t->closed = 1; + GPR_ASSERT(error != GRPC_ERROR_NONE); + t->closed_with_error = GRPC_ERROR_REF(error); connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "close_transport"); - grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error)); if (t->ping_state.is_delayed_ping_timer_set) { grpc_timer_cancel(exec_ctx, &t->ping_state.delayed_ping_timer); } + if (t->have_next_bdp_ping_timer) { + grpc_timer_cancel(exec_ctx, &t->next_bdp_ping_timer); + } switch (t->keepalive_state) { case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING: grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); @@ -636,8 +649,8 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, while (grpc_chttp2_list_pop_writable_stream(t, &s)) { GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close"); } - end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error)); - cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error)); + GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE); + grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); } @@ -698,7 +711,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, post_destructive_reclaimer(exec_ctx, t); } - s->flow_control.s = s; + s->flow_control.Init(t->flow_control.get(), s); GPR_TIMER_END("init_stream", 0); return 0; @@ -749,7 +762,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GRPC_ERROR_UNREF(s->write_closed_error); GRPC_ERROR_UNREF(s->byte_stream_error); - grpc_chttp2_flowctl_destroy_stream(&t->flow_control, &s->flow_control); + s->flow_control.Destroy(); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream"); @@ -940,7 +953,8 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) { + if (t->closed_with_error == GRPC_ERROR_NONE && + grpc_chttp2_list_add_writable_stream(t, s)) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become"); } } @@ -993,7 +1007,7 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); grpc_chttp2_begin_write_result r; - if (t->closed) { + if (t->closed_with_error != GRPC_ERROR_NONE) { r.writing = false; } else { r = grpc_chttp2_begin_write(exec_ctx, t); @@ -1456,7 +1470,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } if (!s->write_closed) { if (t->is_client) { - if (!t->closed) { + if (t->closed_with_error == GRPC_ERROR_NONE) { GPR_ASSERT(s->id == 0); grpc_chttp2_list_add_waiting_for_concurrency(t, s); maybe_start_some_streams(exec_ctx, t); @@ -1464,7 +1478,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_chttp2_cancel_stream( exec_ctx, t, s, grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"), + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Transport closed", &t->closed_with_error, 1), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); } } else { @@ -1616,13 +1631,10 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, if (s->id != 0) { if (!s->read_closed) { already_received = s->frame_storage.length; - grpc_chttp2_flowctl_incoming_bs_update( - &t->flow_control, &s->flow_control, GRPC_HEADER_SIZE_IN_BYTES, - already_received); - grpc_chttp2_act_on_flowctl_action( - exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, - &s->flow_control), - t, s); + s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES, + already_received); + grpc_chttp2_act_on_flowctl_action(exec_ctx, + s->flow_control->MakeAction(), t, s); } } grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); @@ -1686,6 +1698,7 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, /* callback remaining pings: they're not allowed to call into the transpot, and maybe they hold resources that need to be freed */ grpc_chttp2_ping_queue *pq = &t->ping_queue; + GPR_ASSERT(error != GRPC_ERROR_NONE); for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error)); GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[j]); @@ -1695,6 +1708,12 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure *on_initiate, grpc_closure *on_ack) { + if (t->closed_with_error != GRPC_ERROR_NONE) { + GRPC_CLOSURE_SCHED(exec_ctx, on_initiate, + GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_SCHED(exec_ctx, on_ack, GRPC_ERROR_REF(t->closed_with_error)); + return; + } grpc_chttp2_ping_queue *pq = &t->ping_queue; grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate, GRPC_ERROR_NONE); @@ -1753,7 +1772,9 @@ void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx, GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); /*The transport will be closed after the write is done */ close_transport_locked( - exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings")); + exec_ctx, t, grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); } } @@ -2389,55 +2410,44 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, * INPUT PROCESSING - PARSING */ -void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, - grpc_chttp2_flowctl_action action, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { - switch (action.send_stream_update) { - case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: +template <class F> +static void WithUrgency(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_core::chttp2::FlowControlAction::Urgency urgency, + grpc_chttp2_initiate_write_reason reason, F action) { + switch (urgency) { + case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED: break; - case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: - grpc_chttp2_mark_stream_writable(exec_ctx, t, s); - grpc_chttp2_initiate_write( - exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL); - break; - case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: - grpc_chttp2_mark_stream_writable(exec_ctx, t, s); - break; - } - switch (action.send_transport_update) { - case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: - break; - case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: - grpc_chttp2_initiate_write( - exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL); - break; - // this is the same as no action b/c every time the transport enters the - // writing path it will maybe do an update - case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: + case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY: + grpc_chttp2_initiate_write(exec_ctx, t, reason); + // fallthrough + case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE: + action(); break; } - if (action.send_setting_update != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) { - if (action.initial_window_size > 0) { - queue_setting_update(exec_ctx, t, - GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, - (uint32_t)action.initial_window_size); - } - if (action.max_frame_size > 0) { - queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, - (uint32_t)action.max_frame_size); - } - if (action.send_setting_update == GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY) { - grpc_chttp2_initiate_write(exec_ctx, t, - GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS); - } - } - if (action.need_ping) { - GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); - grpc_bdp_estimator_schedule_ping(&t->flow_control.bdp_estimator); - send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked, - &t->finish_bdp_ping_locked); - } +} + +void grpc_chttp2_act_on_flowctl_action( + grpc_exec_ctx *exec_ctx, const grpc_core::chttp2::FlowControlAction &action, + grpc_chttp2_transport *t, grpc_chttp2_stream *s) { + WithUrgency( + exec_ctx, t, action.send_stream_update(), + GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, + [exec_ctx, t, s]() { grpc_chttp2_mark_stream_writable(exec_ctx, t, s); }); + WithUrgency(exec_ctx, t, action.send_transport_update(), + GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {}); + WithUrgency(exec_ctx, t, action.send_initial_window_update(), + GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, + [exec_ctx, t, &action]() { + queue_setting_update(exec_ctx, t, + GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, + action.initial_window_size()); + }); + WithUrgency( + exec_ctx, t, action.send_max_frame_size_update(), + GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [exec_ctx, t, &action]() { + queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, + action.max_frame_size()); + }); } static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, @@ -2487,14 +2497,13 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, } GPR_SWAP(grpc_error *, err, error); GRPC_ERROR_UNREF(err); - if (!t->closed) { + if (t->closed_with_error == GRPC_ERROR_NONE) { GPR_TIMER_BEGIN("reading_action.parse", 0); size_t i = 0; grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, GRPC_ERROR_NONE}; for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) { - grpc_bdp_estimator_add_incoming_bytes( - &t->flow_control.bdp_estimator, + t->flow_control->bdp_estimator()->AddIncomingBytes( (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i])); errors[1] = grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]); @@ -2511,8 +2520,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_END("reading_action.parse", 0); GPR_TIMER_BEGIN("post_parse_locked", 0); - if (t->flow_control.initial_window_update != 0) { - if (t->flow_control.initial_window_update > 0) { + if (t->initial_window_update != 0) { + if (t->initial_window_update > 0) { grpc_chttp2_stream *s; while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { grpc_chttp2_mark_stream_writable(exec_ctx, t, s); @@ -2521,20 +2530,21 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING); } } - t->flow_control.initial_window_update = 0; + t->initial_window_update = 0; } GPR_TIMER_END("post_parse_locked", 0); } GPR_TIMER_BEGIN("post_reading_action_locked", 0); bool keep_reading = false; - if (error == GRPC_ERROR_NONE && t->closed) { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"); + if (error == GRPC_ERROR_NONE && t->closed_with_error != GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Transport closed", &t->closed_with_error, 1); } if (error != GRPC_ERROR_NONE) { close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error)); t->endpoint_reading = 0; - } else if (!t->closed) { + } else if (t->closed_with_error == GRPC_ERROR_NONE) { keep_reading = true; GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading"); } @@ -2543,10 +2553,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, if (keep_reading) { grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->read_action_locked); - grpc_chttp2_act_on_flowctl_action( - exec_ctx, - grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t, - NULL); + grpc_chttp2_act_on_flowctl_action(exec_ctx, t->flow_control->MakeAction(), + t, NULL); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); } else { GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action"); @@ -2559,28 +2567,60 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_END("reading_action_locked", 0); } +// t is reffed prior to calling the first time, and once the callback chain +// that kicks off finishes, it's unreffed +static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t) { + t->flow_control->bdp_estimator()->SchedulePing(); + send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked, + &t->finish_bdp_ping_locked); +} + static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; if (GRPC_TRACER_ON(grpc_http_trace)) { - gpr_log(GPR_DEBUG, "%s: Start BDP ping", t->peer_string); + gpr_log(GPR_DEBUG, "%s: Start BDP ping err=%s", t->peer_string, + grpc_error_string(error)); } /* Reset the keepalive ping timer */ if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) { grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); } - grpc_bdp_estimator_start_ping(&t->flow_control.bdp_estimator); + t->flow_control->bdp_estimator()->StartPing(); } static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; if (GRPC_TRACER_ON(grpc_http_trace)) { - gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string); + gpr_log(GPR_DEBUG, "%s: Complete BDP ping err=%s", t->peer_string, + grpc_error_string(error)); } - grpc_bdp_estimator_complete_ping(exec_ctx, &t->flow_control.bdp_estimator); + if (error != GRPC_ERROR_NONE) { + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); + return; + } + grpc_millis next_ping = + t->flow_control->bdp_estimator()->CompletePing(exec_ctx); + grpc_chttp2_act_on_flowctl_action( + exec_ctx, t->flow_control->PeriodicUpdate(exec_ctx), t, nullptr); + GPR_ASSERT(!t->have_next_bdp_ping_timer); + t->have_next_bdp_ping_timer = true; + grpc_timer_init(exec_ctx, &t->next_bdp_ping_timer, next_ping, + &t->next_bdp_ping_timer_expired_locked); +} - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); +static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx, + void *tp, grpc_error *error) { + grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; + GPR_ASSERT(t->have_next_bdp_ping_timer); + t->have_next_bdp_ping_timer = false; + if (error != GRPC_ERROR_NONE) { + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); + return; + } + schedule_bdp_ping_locked(exec_ctx, t); } void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args, @@ -2645,7 +2685,7 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); - if (t->destroying || t->closed) { + if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; } else if (error == GRPC_ERROR_NONE) { if (t->keepalive_permit_without_calls || @@ -2703,8 +2743,11 @@ static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { if (error == GRPC_ERROR_NONE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; - close_transport_locked(exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "keepalive watchdog timeout")); + close_transport_locked( + exec_ctx, t, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "keepalive watchdog timeout"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL)); } } else { /* The watchdog timer should have been cancelled by @@ -2787,13 +2830,10 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, size_t cur_length = s->frame_storage.length; if (!s->read_closed) { - grpc_chttp2_flowctl_incoming_bs_update(&t->flow_control, &s->flow_control, - bs->next_action.max_size_hint, - cur_length); - grpc_chttp2_act_on_flowctl_action( - exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, - &s->flow_control), - t, s); + s->flow_control->IncomingByteStreamUpdate(bs->next_action.max_size_hint, + cur_length); + grpc_chttp2_act_on_flowctl_action(exec_ctx, s->flow_control->MakeAction(), + t, s); } GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); if (s->frame_storage.length > 0) { |