aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport/chttp2/transport/chttp2_transport.cc')
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc256
1 files changed, 148 insertions, 108 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index e4b19a2c4a..02fc53122d 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -54,7 +54,6 @@
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/transport/transport_impl.h"
-#define DEFAULT_WINDOW 65535
#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
#define MAX_WINDOW 0x7fffffffu
#define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
@@ -152,10 +151,14 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error);
+static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t);
static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error);
static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error);
+static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx,
+ void *tp, grpc_error *error);
static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error);
@@ -218,6 +221,9 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
t->write_cb_pool = next;
}
+ t->flow_control.Destroy();
+
+ GRPC_ERROR_UNREF(t->closed_with_error);
gpr_free(t->ping_acks);
gpr_free(t->peer_string);
gpr_free(t);
@@ -275,10 +281,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->endpoint_reading = 1;
t->next_stream_id = is_client ? 1 : 2;
t->is_client = is_client;
- t->flow_control.remote_window = DEFAULT_WINDOW;
- t->flow_control.announced_window = DEFAULT_WINDOW;
- t->flow_control.target_initial_window_size = DEFAULT_WINDOW;
- t->flow_control.t = t;
t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->is_first_frame = true;
grpc_connectivity_state_init(
@@ -303,6 +305,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
grpc_combiner_scheduler(t->combiner));
+ GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
+ next_bdp_ping_timer_expired_locked, t,
+ grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked,
t, grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
@@ -315,8 +320,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
keepalive_watchdog_fired_locked, t,
grpc_combiner_scheduler(t->combiner));
- grpc_bdp_estimator_init(&t->flow_control.bdp_estimator, t->peer_string);
-
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser);
@@ -340,8 +343,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
window -- this should by rights be 0 */
t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
t->sent_local_settings = 0;
- t->write_buffer_size = DEFAULT_WINDOW;
- t->flow_control.enable_bdp_probe = true;
+ t->write_buffer_size = grpc_core::chttp2::kDefaultWindow;
if (is_client) {
grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
@@ -386,6 +388,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
+ bool enable_bdp = true;
+
if (channel_args) {
for (i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key,
@@ -446,8 +450,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
&channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE});
} else if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
- t->flow_control.enable_bdp_probe =
- grpc_channel_arg_get_integer(&channel_args->args[i], {1, 0, 1});
+ enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true);
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_KEEPALIVE_TIME_MS)) {
const int value = grpc_channel_arg_get_integer(
@@ -542,6 +545,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
+ t->flow_control.Init(exec_ctx, t, enable_bdp);
+
/* No pings allowed before receiving a header or data frame. */
t->ping_state.pings_before_data_required = 0;
t->ping_state.is_delayed_ping_timer_set = false;
@@ -562,10 +567,13 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
}
- grpc_chttp2_act_on_flowctl_action(
- exec_ctx,
- grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t,
- NULL);
+ if (enable_bdp) {
+ GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
+ schedule_bdp_ping_locked(exec_ctx, t);
+
+ grpc_chttp2_act_on_flowctl_action(
+ exec_ctx, t->flow_control->PeriodicUpdate(exec_ctx), t, NULL);
+ }
grpc_chttp2_initiate_write(exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
@@ -595,7 +603,9 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_error *error) {
- if (!t->closed) {
+ end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error));
+ cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error));
+ if (t->closed_with_error == GRPC_ERROR_NONE) {
if (!grpc_error_has_clear_grpc_status(error)) {
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE);
@@ -610,13 +620,16 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_error_add_child(t->close_transport_on_writes_finished, error);
return;
}
- t->closed = 1;
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
+ t->closed_with_error = GRPC_ERROR_REF(error);
connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport");
- grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error));
if (t->ping_state.is_delayed_ping_timer_set) {
grpc_timer_cancel(exec_ctx, &t->ping_state.delayed_ping_timer);
}
+ if (t->have_next_bdp_ping_timer) {
+ grpc_timer_cancel(exec_ctx, &t->next_bdp_ping_timer);
+ }
switch (t->keepalive_state) {
case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
@@ -636,8 +649,8 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close");
}
- end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error));
- cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error));
+ GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
+ grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
}
@@ -698,7 +711,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
post_destructive_reclaimer(exec_ctx, t);
}
- s->flow_control.s = s;
+ s->flow_control.Init(t->flow_control.get(), s);
GPR_TIMER_END("init_stream", 0);
return 0;
@@ -749,7 +762,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GRPC_ERROR_UNREF(s->write_closed_error);
GRPC_ERROR_UNREF(s->byte_stream_error);
- grpc_chttp2_flowctl_destroy_stream(&t->flow_control, &s->flow_control);
+ s->flow_control.Destroy();
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream");
@@ -940,7 +953,8 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
- if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) {
+ if (t->closed_with_error == GRPC_ERROR_NONE &&
+ grpc_chttp2_list_add_writable_stream(t, s)) {
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
}
}
@@ -993,7 +1007,7 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
grpc_chttp2_begin_write_result r;
- if (t->closed) {
+ if (t->closed_with_error != GRPC_ERROR_NONE) {
r.writing = false;
} else {
r = grpc_chttp2_begin_write(exec_ctx, t);
@@ -1456,7 +1470,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
if (!s->write_closed) {
if (t->is_client) {
- if (!t->closed) {
+ if (t->closed_with_error == GRPC_ERROR_NONE) {
GPR_ASSERT(s->id == 0);
grpc_chttp2_list_add_waiting_for_concurrency(t, s);
maybe_start_some_streams(exec_ctx, t);
@@ -1464,7 +1478,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_chttp2_cancel_stream(
exec_ctx, t, s,
grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"),
+ GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Transport closed", &t->closed_with_error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
}
} else {
@@ -1616,13 +1631,10 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if (s->id != 0) {
if (!s->read_closed) {
already_received = s->frame_storage.length;
- grpc_chttp2_flowctl_incoming_bs_update(
- &t->flow_control, &s->flow_control, GRPC_HEADER_SIZE_IN_BYTES,
- already_received);
- grpc_chttp2_act_on_flowctl_action(
- exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control,
- &s->flow_control),
- t, s);
+ s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES,
+ already_received);
+ grpc_chttp2_act_on_flowctl_action(exec_ctx,
+ s->flow_control->MakeAction(), t, s);
}
}
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
@@ -1686,6 +1698,7 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
/* callback remaining pings: they're not allowed to call into the transpot,
and maybe they hold resources that need to be freed */
grpc_chttp2_ping_queue *pq = &t->ping_queue;
+ GPR_ASSERT(error != GRPC_ERROR_NONE);
for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[j]);
@@ -1695,6 +1708,12 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure *on_initiate, grpc_closure *on_ack) {
+ if (t->closed_with_error != GRPC_ERROR_NONE) {
+ GRPC_CLOSURE_SCHED(exec_ctx, on_initiate,
+ GRPC_ERROR_REF(t->closed_with_error));
+ GRPC_CLOSURE_SCHED(exec_ctx, on_ack, GRPC_ERROR_REF(t->closed_with_error));
+ return;
+ }
grpc_chttp2_ping_queue *pq = &t->ping_queue;
grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
GRPC_ERROR_NONE);
@@ -1753,7 +1772,9 @@ void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
/*The transport will be closed after the write is done */
close_transport_locked(
- exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"));
+ exec_ctx, t, grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
}
}
@@ -2389,55 +2410,44 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
* INPUT PROCESSING - PARSING
*/
-void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_flowctl_action action,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s) {
- switch (action.send_stream_update) {
- case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
+template <class F>
+static void WithUrgency(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_core::chttp2::FlowControlAction::Urgency urgency,
+ grpc_chttp2_initiate_write_reason reason, F action) {
+ switch (urgency) {
+ case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
break;
- case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
- grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
- grpc_chttp2_initiate_write(
- exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL);
- break;
- case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
- grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
- break;
- }
- switch (action.send_transport_update) {
- case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
- break;
- case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
- grpc_chttp2_initiate_write(
- exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL);
- break;
- // this is the same as no action b/c every time the transport enters the
- // writing path it will maybe do an update
- case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
+ case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
+ grpc_chttp2_initiate_write(exec_ctx, t, reason);
+ // fallthrough
+ case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
+ action();
break;
}
- if (action.send_setting_update != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
- if (action.initial_window_size > 0) {
- queue_setting_update(exec_ctx, t,
- GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
- (uint32_t)action.initial_window_size);
- }
- if (action.max_frame_size > 0) {
- queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
- (uint32_t)action.max_frame_size);
- }
- if (action.send_setting_update == GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY) {
- grpc_chttp2_initiate_write(exec_ctx, t,
- GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS);
- }
- }
- if (action.need_ping) {
- GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
- grpc_bdp_estimator_schedule_ping(&t->flow_control.bdp_estimator);
- send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked,
- &t->finish_bdp_ping_locked);
- }
+}
+
+void grpc_chttp2_act_on_flowctl_action(
+ grpc_exec_ctx *exec_ctx, const grpc_core::chttp2::FlowControlAction &action,
+ grpc_chttp2_transport *t, grpc_chttp2_stream *s) {
+ WithUrgency(
+ exec_ctx, t, action.send_stream_update(),
+ GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
+ [exec_ctx, t, s]() { grpc_chttp2_mark_stream_writable(exec_ctx, t, s); });
+ WithUrgency(exec_ctx, t, action.send_transport_update(),
+ GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
+ WithUrgency(exec_ctx, t, action.send_initial_window_update(),
+ GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS,
+ [exec_ctx, t, &action]() {
+ queue_setting_update(exec_ctx, t,
+ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
+ action.initial_window_size());
+ });
+ WithUrgency(
+ exec_ctx, t, action.send_max_frame_size_update(),
+ GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [exec_ctx, t, &action]() {
+ queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
+ action.max_frame_size());
+ });
}
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
@@ -2487,14 +2497,13 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
}
GPR_SWAP(grpc_error *, err, error);
GRPC_ERROR_UNREF(err);
- if (!t->closed) {
+ if (t->closed_with_error == GRPC_ERROR_NONE) {
GPR_TIMER_BEGIN("reading_action.parse", 0);
size_t i = 0;
grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
GRPC_ERROR_NONE};
for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
- grpc_bdp_estimator_add_incoming_bytes(
- &t->flow_control.bdp_estimator,
+ t->flow_control->bdp_estimator()->AddIncomingBytes(
(int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]));
errors[1] =
grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]);
@@ -2511,8 +2520,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_END("reading_action.parse", 0);
GPR_TIMER_BEGIN("post_parse_locked", 0);
- if (t->flow_control.initial_window_update != 0) {
- if (t->flow_control.initial_window_update > 0) {
+ if (t->initial_window_update != 0) {
+ if (t->initial_window_update > 0) {
grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
@@ -2521,20 +2530,21 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
}
}
- t->flow_control.initial_window_update = 0;
+ t->initial_window_update = 0;
}
GPR_TIMER_END("post_parse_locked", 0);
}
GPR_TIMER_BEGIN("post_reading_action_locked", 0);
bool keep_reading = false;
- if (error == GRPC_ERROR_NONE && t->closed) {
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed");
+ if (error == GRPC_ERROR_NONE && t->closed_with_error != GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Transport closed", &t->closed_with_error, 1);
}
if (error != GRPC_ERROR_NONE) {
close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
t->endpoint_reading = 0;
- } else if (!t->closed) {
+ } else if (t->closed_with_error == GRPC_ERROR_NONE) {
keep_reading = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading");
}
@@ -2543,10 +2553,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (keep_reading) {
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer,
&t->read_action_locked);
- grpc_chttp2_act_on_flowctl_action(
- exec_ctx,
- grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t,
- NULL);
+ grpc_chttp2_act_on_flowctl_action(exec_ctx, t->flow_control->MakeAction(),
+ t, NULL);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else {
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action");
@@ -2559,28 +2567,60 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_END("reading_action_locked", 0);
}
+// t is reffed prior to calling the first time, and once the callback chain
+// that kicks off finishes, it's unreffed
+static void schedule_bdp_ping_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t) {
+ t->flow_control->bdp_estimator()->SchedulePing();
+ send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked,
+ &t->finish_bdp_ping_locked);
+}
+
static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp;
if (GRPC_TRACER_ON(grpc_http_trace)) {
- gpr_log(GPR_DEBUG, "%s: Start BDP ping", t->peer_string);
+ gpr_log(GPR_DEBUG, "%s: Start BDP ping err=%s", t->peer_string,
+ grpc_error_string(error));
}
/* Reset the keepalive ping timer */
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
}
- grpc_bdp_estimator_start_ping(&t->flow_control.bdp_estimator);
+ t->flow_control->bdp_estimator()->StartPing();
}
static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp;
if (GRPC_TRACER_ON(grpc_http_trace)) {
- gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string);
+ gpr_log(GPR_DEBUG, "%s: Complete BDP ping err=%s", t->peer_string,
+ grpc_error_string(error));
}
- grpc_bdp_estimator_complete_ping(exec_ctx, &t->flow_control.bdp_estimator);
+ if (error != GRPC_ERROR_NONE) {
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
+ return;
+ }
+ grpc_millis next_ping =
+ t->flow_control->bdp_estimator()->CompletePing(exec_ctx);
+ grpc_chttp2_act_on_flowctl_action(
+ exec_ctx, t->flow_control->PeriodicUpdate(exec_ctx), t, nullptr);
+ GPR_ASSERT(!t->have_next_bdp_ping_timer);
+ t->have_next_bdp_ping_timer = true;
+ grpc_timer_init(exec_ctx, &t->next_bdp_ping_timer, next_ping,
+ &t->next_bdp_ping_timer_expired_locked);
+}
- GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
+static void next_bdp_ping_timer_expired_locked(grpc_exec_ctx *exec_ctx,
+ void *tp, grpc_error *error) {
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp;
+ GPR_ASSERT(t->have_next_bdp_ping_timer);
+ t->have_next_bdp_ping_timer = false;
+ if (error != GRPC_ERROR_NONE) {
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
+ return;
+ }
+ schedule_bdp_ping_locked(exec_ctx, t);
}
void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args,
@@ -2645,7 +2685,7 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg;
GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
- if (t->destroying || t->closed) {
+ if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
} else if (error == GRPC_ERROR_NONE) {
if (t->keepalive_permit_without_calls ||
@@ -2703,8 +2743,11 @@ static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
if (error == GRPC_ERROR_NONE) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
- close_transport_locked(exec_ctx, t, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "keepalive watchdog timeout"));
+ close_transport_locked(
+ exec_ctx, t,
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "keepalive watchdog timeout"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL));
}
} else {
/* The watchdog timer should have been cancelled by
@@ -2787,13 +2830,10 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
size_t cur_length = s->frame_storage.length;
if (!s->read_closed) {
- grpc_chttp2_flowctl_incoming_bs_update(&t->flow_control, &s->flow_control,
- bs->next_action.max_size_hint,
- cur_length);
- grpc_chttp2_act_on_flowctl_action(
- exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control,
- &s->flow_control),
- t, s);
+ s->flow_control->IncomingByteStreamUpdate(bs->next_action.max_size_hint,
+ cur_length);
+ grpc_chttp2_act_on_flowctl_action(exec_ctx, s->flow_control->MakeAction(),
+ t, s);
}
GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
if (s->frame_storage.length > 0) {