aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c122
-rw-r--r--src/core/ext/transport/chttp2/transport/flow_control.c177
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h32
-rw-r--r--src/core/lib/transport/bdp_estimator.c10
-rw-r--r--src/core/lib/transport/bdp_estimator.h10
-rw-r--r--test/core/transport/bdp_estimator_test.c7
6 files changed, 222 insertions, 136 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 8976686082..7bad188f4e 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -304,10 +304,10 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
keepalive_watchdog_fired_locked, t,
grpc_combiner_scheduler(t->combiner));
- grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string);
- t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC);
+ grpc_bdp_estimator_init(&t->flow_control.bdp_estimator, t->peer_string);
+ t->flow_control.last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC);
grpc_pid_controller_init(
- &t->pid_controller,
+ &t->flow_control.pid_controller,
(grpc_pid_controller_args){.gain_p = 4,
.gain_i = 8,
.gain_d = 0,
@@ -340,7 +340,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
t->sent_local_settings = 0;
t->write_buffer_size = DEFAULT_WINDOW;
- t->enable_bdp_probe = true;
+ t->flow_control.enable_bdp_probe = true;
if (is_client) {
grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
@@ -457,7 +457,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
(grpc_integer_options){0, 0, MAX_WRITE_BUFFER_SIZE});
} else if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
- t->enable_bdp_probe = grpc_channel_arg_get_integer(
+ t->flow_control.enable_bdp_probe = grpc_channel_arg_get_integer(
&channel_args->args[i], (grpc_integer_options){1, 0, 1});
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_KEEPALIVE_TIME_MS)) {
@@ -2253,46 +2253,27 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
break;
}
-}
-
-static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- double bdp_dbl) {
- // initial window size bounded [1,2^31-1], but we set the min to 128.
- int32_t bdp = GPR_CLAMP((int32_t)bdp_dbl, 128, INT32_MAX);
- int64_t delta =
- (int64_t)bdp -
- (int64_t)t->settings[GRPC_LOCAL_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- if (delta == 0 || (delta > -bdp / 10 && delta < bdp / 10)) {
- return;
- }
- if (GRPC_TRACER_ON(grpc_bdp_estimator_trace) ||
- GRPC_TRACER_ON(grpc_flowctl_trace)) {
- gpr_log(GPR_DEBUG, "%s | %p[%s] | update initial window size to %d",
- t->peer_string, t, t->is_client ? "cli" : "svr", (int)bdp);
- }
- queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
- (uint32_t)bdp);
-}
-
-static void update_frame(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- double bw_dbl, double bdp_dbl) {
- int32_t bdp = (int32_t)GPR_CLAMP(bdp_dbl, 128.0, INT32_MAX);
- int32_t target = (int32_t)GPR_MAX(bw_dbl / 1000, bdp);
- // frame size is bounded [2^14,2^24-1]
- int32_t frame_size = GPR_CLAMP(target, 16384, 16777215);
- int64_t delta = (int64_t)frame_size -
- (int64_t)t->settings[GRPC_LOCAL_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE];
- if (delta == 0 || (delta > -frame_size / 10 && delta < frame_size / 10)) {
- return;
+ if (action.send_setting_update != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
+ if (action.initial_window_size > 0) {
+ queue_setting_update(exec_ctx, t,
+ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
+ (uint32_t)action.initial_window_size);
+ }
+ if (action.max_frame_size > 0) {
+ queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
+ (uint32_t)action.max_frame_size);
+ }
+ if (action.send_setting_update == GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY) {
+ grpc_chttp2_initiate_write(exec_ctx, t, "immediate setting update");
+ }
}
- if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
- gpr_log(GPR_DEBUG, "%s: update max_frame size to %d", t->peer_string,
- (int)frame_size);
+ if (action.need_ping) {
+ GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
+ grpc_bdp_estimator_schedule_ping(&t->flow_control.bdp_estimator);
+ send_ping_locked(exec_ctx, t,
+ GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE,
+ &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked);
}
- queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
- (uint32_t)frame_size);
}
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
@@ -2330,7 +2311,6 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_BEGIN("reading_action_locked", 0);
grpc_chttp2_transport *t = tp;
- bool need_bdp_ping = false;
GRPC_ERROR_REF(error);
@@ -2349,11 +2329,9 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
GRPC_ERROR_NONE};
for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
- if (grpc_bdp_estimator_add_incoming_bytes(
- &t->bdp_estimator,
- (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]))) {
- need_bdp_ping = true;
- }
+ grpc_bdp_estimator_add_incoming_bytes(
+ &t->flow_control.bdp_estimator,
+ (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]));
errors[1] =
grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]);
}
@@ -2400,45 +2378,9 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (keep_reading) {
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer,
&t->read_action_locked);
-
- if (t->enable_bdp_probe) {
- if (need_bdp_ping) {
- GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
- grpc_bdp_estimator_schedule_ping(&t->bdp_estimator);
- send_ping_locked(exec_ctx, t,
- GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE,
- &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked);
- }
-
- int64_t estimate = -1;
- double bdp_guess = -1;
- if (grpc_bdp_estimator_get_estimate(&t->bdp_estimator, &estimate)) {
- double target = 1 + log2((double)estimate);
- double memory_pressure = grpc_resource_quota_get_memory_pressure(
- grpc_resource_user_quota(grpc_endpoint_get_resource_user(t->ep)));
- if (memory_pressure > 0.8) {
- target *= 1 - GPR_MIN(1, (memory_pressure - 0.8) / 0.1);
- }
- double bdp_error =
- target - grpc_pid_controller_last(&t->pid_controller);
- gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
- gpr_timespec dt_timespec = gpr_time_sub(now, t->last_pid_update);
- double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9;
- if (dt > 0.1) {
- dt = 0.1;
- }
- double log2_bdp_guess =
- grpc_pid_controller_update(&t->pid_controller, bdp_error, dt);
- bdp_guess = pow(2, log2_bdp_guess);
- update_bdp(exec_ctx, t, bdp_guess);
- t->last_pid_update = now;
- }
-
- double bw = -1;
- if (grpc_bdp_estimator_get_bw(&t->bdp_estimator, &bw)) {
- update_frame(exec_ctx, t, bw, bdp_guess);
- }
- }
+ grpc_chttp2_act_on_flowctl_action(
+ exec_ctx, grpc_chttp2_flowctl_get_bdp_action(&t->flow_control), t,
+ NULL);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else {
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action");
@@ -2461,7 +2403,7 @@ static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer);
}
- grpc_bdp_estimator_start_ping(&t->bdp_estimator);
+ grpc_bdp_estimator_start_ping(&t->flow_control.bdp_estimator);
}
static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
@@ -2470,7 +2412,7 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (GRPC_TRACER_ON(grpc_http_trace)) {
gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string);
}
- grpc_bdp_estimator_complete_ping(&t->bdp_estimator);
+ grpc_bdp_estimator_complete_ping(&t->flow_control.bdp_estimator);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
}
diff --git a/src/core/ext/transport/chttp2/transport/flow_control.c b/src/core/ext/transport/chttp2/transport/flow_control.c
index c9f7eabd43..8dbdd1290a 100644
--- a/src/core/ext/transport/chttp2/transport/flow_control.c
+++ b/src/core/ext/transport/chttp2/transport/flow_control.c
@@ -18,6 +18,7 @@
#include "src/core/ext/transport/chttp2/transport/internal.h"
+#include <math.h>
#include <string.h>
#include <grpc/support/alloc.h>
@@ -39,6 +40,8 @@ typedef struct {
int64_t remote_window_delta;
int64_t local_window_delta;
int64_t announced_window_delta;
+ uint32_t local_init_window;
+ uint32_t local_max_frame;
} shadow_flow_control;
static void pretrace(shadow_flow_control* shadow_fc,
@@ -54,14 +57,28 @@ static void pretrace(shadow_flow_control* shadow_fc,
}
}
-static char* fmt_str(int64_t old, int64_t new) {
+#define TRACE_PADDING 30
+
+static char* fmt_int64_diff_str(int64_t old, int64_t new) {
char* str;
if (old != new) {
gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old, new);
} else {
gpr_asprintf(&str, "%" PRId64 "", old);
}
- char* str_lp = gpr_leftpad(str, ' ', 30);
+ char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING);
+ gpr_free(str);
+ return str_lp;
+}
+
+static char* fmt_uint32_diff_str(uint32_t old, uint32_t new) {
+ char* str;
+ if (new > 0 && old != new) {
+ gpr_asprintf(&str, "%" PRIu32 " -> %" PRIu32 "", old, new);
+ } else {
+ gpr_asprintf(&str, "%" PRIu32 "", old);
+ }
+ char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING);
gpr_free(str);
return str_lp;
}
@@ -75,24 +92,28 @@ static void posttrace(shadow_flow_control* shadow_fc,
uint32_t remote_window =
tfc->t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- char* trw_str = fmt_str(shadow_fc->remote_window, tfc->remote_window);
- char* tlw_str = fmt_str(shadow_fc->target_window,
- grpc_chttp2_target_announced_window(tfc));
- char* taw_str = fmt_str(shadow_fc->announced_window, tfc->announced_window);
+ char* trw_str =
+ fmt_int64_diff_str(shadow_fc->remote_window, tfc->remote_window);
+ char* tlw_str = fmt_int64_diff_str(shadow_fc->target_window,
+ grpc_chttp2_target_announced_window(tfc));
+ char* taw_str =
+ fmt_int64_diff_str(shadow_fc->announced_window, tfc->announced_window);
char* srw_str;
char* slw_str;
char* saw_str;
if (sfc != NULL) {
- srw_str = fmt_str(shadow_fc->remote_window_delta + remote_window,
- sfc->remote_window_delta + remote_window);
- slw_str = fmt_str(shadow_fc->local_window_delta + acked_local_window,
- sfc->local_window_delta + acked_local_window);
- saw_str = fmt_str(shadow_fc->announced_window_delta + acked_local_window,
- sfc->announced_window_delta + acked_local_window);
+ srw_str = fmt_int64_diff_str(shadow_fc->remote_window_delta + remote_window,
+ sfc->remote_window_delta + remote_window);
+ slw_str =
+ fmt_int64_diff_str(shadow_fc->local_window_delta + acked_local_window,
+ sfc->local_window_delta + acked_local_window);
+ saw_str = fmt_int64_diff_str(
+ shadow_fc->announced_window_delta + acked_local_window,
+ sfc->announced_window_delta + acked_local_window);
} else {
- srw_str = gpr_leftpad("", ' ', 30);
- slw_str = gpr_leftpad("", ' ', 30);
- saw_str = gpr_leftpad("", ' ', 30);
+ srw_str = gpr_leftpad("", ' ', TRACE_PADDING);
+ slw_str = gpr_leftpad("", ' ', TRACE_PADDING);
+ saw_str = gpr_leftpad("", ' ', TRACE_PADDING);
}
gpr_log(GPR_DEBUG,
"%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s",
@@ -120,10 +141,21 @@ static char* urgency_to_string(grpc_chttp2_flowctl_urgency urgency) {
GPR_UNREACHABLE_CODE(return "unknown");
}
-static void trace_action(grpc_chttp2_flowctl_action action) {
- gpr_log(GPR_DEBUG, "transport: %s, stream: %s",
+static void trace_action(grpc_chttp2_transport_flowctl* tfc,
+ grpc_chttp2_flowctl_action action) {
+ char* iw_str = fmt_uint32_diff_str(
+ tfc->t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ action.initial_window_size);
+ char* mf_str = fmt_uint32_diff_str(
+ tfc->t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
+ action.max_frame_size);
+ gpr_log(GPR_DEBUG, "t[%s], s[%s], settings[%s] iw:%s mf:%s",
urgency_to_string(action.send_transport_update),
- urgency_to_string(action.send_stream_update));
+ urgency_to_string(action.send_stream_update),
+ urgency_to_string(action.send_setting_update), iw_str, mf_str);
+ gpr_free(iw_str);
+ gpr_free(mf_str);
}
#define PRETRACE(tfc, sfc) \
@@ -131,11 +163,12 @@ static void trace_action(grpc_chttp2_flowctl_action action) {
GRPC_FLOW_CONTROL_IF_TRACING(pretrace(&shadow_fc, tfc, sfc))
#define POSTTRACE(tfc, sfc, reason) \
GRPC_FLOW_CONTROL_IF_TRACING(posttrace(&shadow_fc, tfc, sfc, reason))
-#define TRACEACTION(action) GRPC_FLOW_CONTROL_IF_TRACING(trace_action(action))
+#define TRACEACTION(tfc, action) \
+ GRPC_FLOW_CONTROL_IF_TRACING(trace_action(tfc, action))
#else
#define PRETRACE(tfc, sfc)
#define POSTTRACE(tfc, sfc, reason)
-#define TRACEACTION(action)
+#define TRACEACTION(tfc, action)
#endif
/* How many bytes of incoming flow control would we like to advertise */
@@ -342,15 +375,58 @@ void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl* tfc,
announced_window_delta_preupdate(tfc, sfc);
}
+// Returns an urgency with which to make an update
+static grpc_chttp2_flowctl_urgency delta_is_significant(
+ const grpc_chttp2_transport_flowctl* tfc, int32_t value,
+ grpc_chttp2_setting_id setting_id) {
+ int64_t delta = (int64_t)value -
+ (int64_t)tfc->t->settings[GRPC_LOCAL_SETTINGS][setting_id];
+ // TODO(ncteisen): tune this
+ if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) {
+ return GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE;
+ } else {
+ return GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED;
+ }
+}
+
+// Takes in a target and uses the pid controller to return a stabilized
+// guess at the new bdp.
+static double get_pid_controller_guess(grpc_chttp2_transport_flowctl* tfc,
+ double target) {
+ double bdp_error = target - grpc_pid_controller_last(&tfc->pid_controller);
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ gpr_timespec dt_timespec = gpr_time_sub(now, tfc->last_pid_update);
+ double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9;
+ if (dt > 0.1) {
+ dt = 0.1;
+ }
+ double log2_bdp_guess =
+ grpc_pid_controller_update(&tfc->pid_controller, bdp_error, dt);
+ tfc->last_pid_update = now;
+ return pow(2, log2_bdp_guess);
+}
+
+// Take in a target and modifies it based on the memory pressure of the system
+static double get_target_under_memory_pressure(
+ grpc_chttp2_transport_flowctl* tfc, double target) {
+ // do not increase window under heavy memory pressure.
+ double memory_pressure = grpc_resource_quota_get_memory_pressure(
+ grpc_resource_user_quota(grpc_endpoint_get_resource_user(tfc->t->ep)));
+ if (memory_pressure > 0.8) {
+ target *= 1 - GPR_MIN(1, (memory_pressure - 0.8) / 0.1);
+ }
+ return target;
+}
+
grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
- const grpc_chttp2_transport_flowctl* tfc,
- const grpc_chttp2_stream_flowctl* sfc) {
+ grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) {
grpc_chttp2_flowctl_action action;
memset(&action, 0, sizeof(action));
uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc);
if (tfc->announced_window < target_announced_window / 2) {
action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY;
}
+ // TODO(ncteisen): tune this
if (sfc != NULL && !sfc->s->read_closed) {
uint32_t sent_init_window =
tfc->t->settings[GRPC_SENT_SETTINGS]
@@ -364,6 +440,61 @@ grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
action.send_stream_update = GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE;
}
}
- TRACEACTION(action);
+ TRACEACTION(tfc, action);
+ return action;
+}
+
+grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_bdp_action(
+ grpc_chttp2_transport_flowctl* tfc) {
+ grpc_chttp2_flowctl_action action;
+ memset(&action, 0, sizeof(action));
+ if (tfc->enable_bdp_probe) {
+ action.need_ping = grpc_bdp_estimator_need_ping(&tfc->bdp_estimator);
+
+ // get bdp estimate and update initial_window accordingly.
+ int64_t estimate = -1;
+ int32_t bdp = -1;
+ if (grpc_bdp_estimator_get_estimate(&tfc->bdp_estimator, &estimate)) {
+ double target = 1 + log2((double)estimate);
+
+ // target might change based on how much memory pressure we are under
+ // TODO(ncteisen): experiment with setting target to be huge under low
+ // memory pressure.
+ target = get_target_under_memory_pressure(tfc, target);
+
+ // run our target through the pid controller to stabilize change.
+ // TODO(ncteisen): experiment with other controllers here.
+ double bdp_guess = get_pid_controller_guess(tfc, target);
+
+ // Though initial window 'could' drop to 0, we keep the floor at 128
+ bdp = GPR_MAX((int32_t)bdp_guess, 128);
+
+ grpc_chttp2_flowctl_urgency init_window_update_urgency =
+ delta_is_significant(tfc, bdp,
+ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE);
+ if (init_window_update_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
+ action.send_setting_update = init_window_update_urgency;
+ action.initial_window_size = (uint32_t)bdp;
+ }
+ }
+
+ // get bandwidth estimate and update max_frame accordingly.
+ double bw_dbl = -1;
+ if (grpc_bdp_estimator_get_bw(&tfc->bdp_estimator, &bw_dbl)) {
+ // we target the max of BDP or bandwidth in microseconds.
+ int32_t frame_size =
+ GPR_CLAMP(GPR_MAX((int32_t)bw_dbl / 1000, bdp), 16384, 16777215);
+ grpc_chttp2_flowctl_urgency frame_size_urgency = delta_is_significant(
+ tfc, frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE);
+ if (frame_size_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
+ if (frame_size_urgency > action.send_setting_update) {
+ action.send_setting_update = frame_size_urgency;
+ }
+ action.max_frame_size = (uint32_t)frame_size;
+ }
+ }
+ }
+
+ TRACEACTION(tfc, action);
return action;
}
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index f26f14dbec..3c41a8958f 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -238,7 +238,17 @@ typedef struct {
* to send WINDOW_UPDATE frames. */
int64_t announced_window;
- // read only pointer back to transport for certain data
+ /** should we probe bdp? */
+ bool enable_bdp_probe;
+
+ /* bdp estimation */
+ grpc_bdp_estimator bdp_estimator;
+
+ /* pid controller */
+ grpc_pid_controller pid_controller;
+ gpr_timespec last_pid_update;
+
+ // pointer back to transport for tracing
const grpc_chttp2_transport *t;
} grpc_chttp2_transport_flowctl;
@@ -261,9 +271,6 @@ struct grpc_chttp2_transport {
/** is there a read request to the endpoint outstanding? */
uint8_t endpoint_reading;
- /** should we probe bdp? */
- bool enable_bdp_probe;
-
grpc_chttp2_optimization_target opt_target;
/** various lists of streams */
@@ -358,13 +365,6 @@ struct grpc_chttp2_transport {
grpc_chttp2_transport_flowctl flow_control;
- /* bdp estimation */
- grpc_bdp_estimator bdp_estimator;
-
- /* pid controller */
- grpc_pid_controller pid_controller;
- gpr_timespec last_pid_update;
-
/* deframing */
grpc_chttp2_deframe_transport_state deframe_state;
uint8_t incoming_frame_type;
@@ -704,13 +704,19 @@ typedef enum {
typedef struct {
grpc_chttp2_flowctl_urgency send_stream_update;
grpc_chttp2_flowctl_urgency send_transport_update;
+ grpc_chttp2_flowctl_urgency send_setting_update;
+ uint32_t initial_window_size;
+ uint32_t max_frame_size;
+ bool need_ping;
} grpc_chttp2_flowctl_action;
// Reads the flow control data and returns and actionable struct that will tell
// chttp2 exactly what it needs to do
grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
- const grpc_chttp2_transport_flowctl *tfc,
- const grpc_chttp2_stream_flowctl *sfc);
+ grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_stream_flowctl *sfc);
+
+grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_bdp_action(
+ grpc_chttp2_transport_flowctl *tfc);
// Takes in a flow control action and performs all the needed operations.
void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/transport/bdp_estimator.c b/src/core/lib/transport/bdp_estimator.c
index 311ae6390d..8b57693413 100644
--- a/src/core/lib/transport/bdp_estimator.c
+++ b/src/core/lib/transport/bdp_estimator.c
@@ -33,20 +33,24 @@ void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name) {
estimator->bw_est = 0;
}
-bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator,
+bool grpc_bdp_estimator_get_estimate(const grpc_bdp_estimator *estimator,
int64_t *estimate) {
*estimate = estimator->estimate;
return true;
}
-bool grpc_bdp_estimator_get_bw(grpc_bdp_estimator *estimator, double *bw) {
+bool grpc_bdp_estimator_get_bw(const grpc_bdp_estimator *estimator,
+ double *bw) {
*bw = estimator->bw_est;
return true;
}
-bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
+void grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
int64_t num_bytes) {
estimator->accumulator += num_bytes;
+}
+
+bool grpc_bdp_estimator_need_ping(const grpc_bdp_estimator *estimator) {
switch (estimator->ping_state) {
case GRPC_BDP_PING_UNSCHEDULED:
return true;
diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h
index a232d1f87f..1ef0dc99dd 100644
--- a/src/core/lib/transport/bdp_estimator.h
+++ b/src/core/lib/transport/bdp_estimator.h
@@ -47,13 +47,15 @@ typedef struct grpc_bdp_estimator {
void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name);
// Returns true if a reasonable estimate could be obtained
-bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator,
+bool grpc_bdp_estimator_get_estimate(const grpc_bdp_estimator *estimator,
int64_t *estimate);
-// Returns true if a reasonable estimate could be obtained
-bool grpc_bdp_estimator_get_bw(grpc_bdp_estimator *estimator, double *bw);
+// Tracks new bytes read.
+bool grpc_bdp_estimator_get_bw(const grpc_bdp_estimator *estimator, double *bw);
// Returns true if the user should schedule a ping
-bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
+void grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
int64_t num_bytes);
+// Returns true if the user should schedule a ping
+bool grpc_bdp_estimator_need_ping(const grpc_bdp_estimator *estimator);
// Schedule a ping: call in response to receiving a true from
// grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a
// transport (but not necessarily started)
diff --git a/test/core/transport/bdp_estimator_test.c b/test/core/transport/bdp_estimator_test.c
index e2612c7718..dda48f45b1 100644
--- a/test/core/transport/bdp_estimator_test.c
+++ b/test/core/transport/bdp_estimator_test.c
@@ -43,12 +43,13 @@ static void test_get_estimate_no_samples(void) {
static void add_samples(grpc_bdp_estimator *estimator, int64_t *samples,
size_t n) {
- GPR_ASSERT(grpc_bdp_estimator_add_incoming_bytes(estimator, 1234567) == true);
+ grpc_bdp_estimator_add_incoming_bytes(estimator, 1234567);
+ GPR_ASSERT(grpc_bdp_estimator_need_ping(estimator) == true);
grpc_bdp_estimator_schedule_ping(estimator);
grpc_bdp_estimator_start_ping(estimator);
for (size_t i = 0; i < n; i++) {
- GPR_ASSERT(grpc_bdp_estimator_add_incoming_bytes(estimator, samples[i]) ==
- false);
+ grpc_bdp_estimator_add_incoming_bytes(estimator, samples[i]);
+ GPR_ASSERT(grpc_bdp_estimator_need_ping(estimator) == false);
}
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(1, GPR_TIMESPAN)));