aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/chttp2/transport/flow_control.cc
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-09-21 18:56:08 -0700
committerGravatar Yash Tibrewal <yashkt@google.com>2017-10-02 16:29:41 -0700
commit83062842c3601faeddcae8f901c515e3c78f3661 (patch)
tree58536b25350a5f2b3123161ab4ca9b8ce01a43a5 /src/core/ext/transport/chttp2/transport/flow_control.cc
parent1c9b584a153ff592c92b6cf6baa620d8602a37cd (diff)
Changes for C to C++. Adding extern C to header files for compatibility.
Also converting to .cc
Diffstat (limited to 'src/core/ext/transport/chttp2/transport/flow_control.cc')
-rw-r--r--src/core/ext/transport/chttp2/transport/flow_control.cc502
1 files changed, 502 insertions, 0 deletions
diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc
new file mode 100644
index 0000000000..569a6349d3
--- /dev/null
+++ b/src/core/ext/transport/chttp2/transport/flow_control.cc
@@ -0,0 +1,502 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/ext/transport/chttp2/transport/internal.h"
+
+#include <limits.h>
+#include <math.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/lib/support/string.h"
+
+static uint32_t grpc_chttp2_target_announced_window(
+ const grpc_chttp2_transport_flowctl* tfc);
+
+#ifndef NDEBUG
+
+typedef struct {
+ int64_t remote_window;
+ int64_t target_window;
+ int64_t announced_window;
+ int64_t remote_window_delta;
+ int64_t local_window_delta;
+ int64_t announced_window_delta;
+ uint32_t local_init_window;
+ uint32_t local_max_frame;
+} shadow_flow_control;
+
+static void pretrace(shadow_flow_control* shadow_fc,
+ grpc_chttp2_transport_flowctl* tfc,
+ grpc_chttp2_stream_flowctl* sfc) {
+ shadow_fc->remote_window = tfc->remote_window;
+ shadow_fc->target_window = grpc_chttp2_target_announced_window(tfc);
+ shadow_fc->announced_window = tfc->announced_window;
+ if (sfc != NULL) {
+ shadow_fc->remote_window_delta = sfc->remote_window_delta;
+ shadow_fc->local_window_delta = sfc->local_window_delta;
+ shadow_fc->announced_window_delta = sfc->announced_window_delta;
+ }
+}
+
+#define TRACE_PADDING 30
+
+static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) {
+ char* str;
+ if (old_val != new_val) {
+ gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old_val, new_val);
+ } else {
+ gpr_asprintf(&str, "%" PRId64 "", old_val);
+ }
+ char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING);
+ gpr_free(str);
+ return str_lp;
+}
+
+static char* fmt_uint32_diff_str(uint32_t old_val, uint32_t new_val) {
+ char* str;
+ if (new_val > 0 && old_val != new_val) {
+ gpr_asprintf(&str, "%" PRIu32 " -> %" PRIu32 "", old_val, new_val);
+ } else {
+ gpr_asprintf(&str, "%" PRIu32 "", old_val);
+ }
+ char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING);
+ gpr_free(str);
+ return str_lp;
+}
+
+static void posttrace(shadow_flow_control* shadow_fc,
+ grpc_chttp2_transport_flowctl* tfc,
+ grpc_chttp2_stream_flowctl* sfc, char* reason) {
+ uint32_t acked_local_window =
+ tfc->t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ uint32_t remote_window =
+ tfc->t->settings[GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ char* trw_str =
+ fmt_int64_diff_str(shadow_fc->remote_window, tfc->remote_window);
+ char* tlw_str = fmt_int64_diff_str(shadow_fc->target_window,
+ grpc_chttp2_target_announced_window(tfc));
+ char* taw_str =
+ fmt_int64_diff_str(shadow_fc->announced_window, tfc->announced_window);
+ char* srw_str;
+ char* slw_str;
+ char* saw_str;
+ if (sfc != NULL) {
+ srw_str = fmt_int64_diff_str(shadow_fc->remote_window_delta + remote_window,
+ sfc->remote_window_delta + remote_window);
+ slw_str =
+ fmt_int64_diff_str(shadow_fc->local_window_delta + acked_local_window,
+ sfc->local_window_delta + acked_local_window);
+ saw_str = fmt_int64_diff_str(
+ shadow_fc->announced_window_delta + acked_local_window,
+ sfc->announced_window_delta + acked_local_window);
+ } else {
+ srw_str = gpr_leftpad("", ' ', TRACE_PADDING);
+ slw_str = gpr_leftpad("", ' ', TRACE_PADDING);
+ saw_str = gpr_leftpad("", ' ', TRACE_PADDING);
+ }
+ gpr_log(GPR_DEBUG,
+ "%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s",
+ tfc, sfc != NULL ? sfc->s->id : 0, tfc->t->is_client ? "cli" : "svr",
+ reason, trw_str, tlw_str, taw_str, srw_str, slw_str, saw_str);
+ gpr_free(trw_str);
+ gpr_free(tlw_str);
+ gpr_free(taw_str);
+ gpr_free(srw_str);
+ gpr_free(slw_str);
+ gpr_free(saw_str);
+}
+
+static char* urgency_to_string(grpc_chttp2_flowctl_urgency urgency) {
+ switch (urgency) {
+ case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
+ return "no action";
+ case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
+ return "update immediately";
+ case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
+ return "queue update";
+ default:
+ GPR_UNREACHABLE_CODE(return "unknown");
+ }
+ GPR_UNREACHABLE_CODE(return "unknown");
+}
+
+static void trace_action(grpc_chttp2_transport_flowctl* tfc,
+ grpc_chttp2_flowctl_action action) {
+ char* iw_str = fmt_uint32_diff_str(
+ tfc->t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ action.initial_window_size);
+ char* mf_str = fmt_uint32_diff_str(
+ tfc->t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
+ action.max_frame_size);
+ gpr_log(GPR_DEBUG, "t[%s], s[%s], settings[%s] iw:%s mf:%s",
+ urgency_to_string(action.send_transport_update),
+ urgency_to_string(action.send_stream_update),
+ urgency_to_string(action.send_setting_update), iw_str, mf_str);
+ gpr_free(iw_str);
+ gpr_free(mf_str);
+}
+
+#define PRETRACE(tfc, sfc) \
+ shadow_flow_control shadow_fc; \
+ GRPC_FLOW_CONTROL_IF_TRACING(pretrace(&shadow_fc, tfc, sfc))
+#define POSTTRACE(tfc, sfc, reason) \
+ GRPC_FLOW_CONTROL_IF_TRACING(posttrace(&shadow_fc, tfc, sfc, reason))
+#define TRACEACTION(tfc, action) \
+ GRPC_FLOW_CONTROL_IF_TRACING(trace_action(tfc, action))
+#else
+#define PRETRACE(tfc, sfc)
+#define POSTTRACE(tfc, sfc, reason)
+#define TRACEACTION(tfc, action)
+#endif
+
+/* How many bytes of incoming flow control would we like to advertise */
+static uint32_t grpc_chttp2_target_announced_window(
+ const grpc_chttp2_transport_flowctl* tfc) {
+ return (uint32_t)GPR_MIN(
+ (int64_t)((1u << 31) - 1),
+ tfc->announced_stream_total_over_incoming_window +
+ tfc->t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
+}
+
+// we have sent data on the wire, we must track this in our bookkeeping for the
+// remote peer's flow control.
+void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl* tfc,
+ grpc_chttp2_stream_flowctl* sfc,
+ int64_t size) {
+ PRETRACE(tfc, sfc);
+ tfc->remote_window -= size;
+ sfc->remote_window_delta -= size;
+ POSTTRACE(tfc, sfc, " data sent");
+}
+
+static void announced_window_delta_preupdate(grpc_chttp2_transport_flowctl* tfc,
+ grpc_chttp2_stream_flowctl* sfc) {
+ if (sfc->announced_window_delta > 0) {
+ tfc->announced_stream_total_over_incoming_window -=
+ sfc->announced_window_delta;
+ } else {
+ tfc->announced_stream_total_under_incoming_window +=
+ -sfc->announced_window_delta;
+ }
+}
+
+static void announced_window_delta_postupdate(
+ grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) {
+ if (sfc->announced_window_delta > 0) {
+ tfc->announced_stream_total_over_incoming_window +=
+ sfc->announced_window_delta;
+ } else {
+ tfc->announced_stream_total_under_incoming_window -=
+ -sfc->announced_window_delta;
+ }
+}
+
+// We have received data from the wire. We must track this in our own flow
+// control bookkeeping.
+// Returns an error if the incoming frame violates our flow control.
+grpc_error* grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl* tfc,
+ grpc_chttp2_stream_flowctl* sfc,
+ int64_t incoming_frame_size) {
+ uint32_t sent_init_window =
+ tfc->t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ uint32_t acked_init_window =
+ tfc->t->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ PRETRACE(tfc, sfc);
+ if (incoming_frame_size > tfc->announced_window) {
+ char* msg;
+ gpr_asprintf(&msg,
+ "frame of size %" PRId64 " overflows local window of %" PRId64,
+ incoming_frame_size, tfc->announced_window);
+ grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
+ gpr_free(msg);
+ return err;
+ }
+
+ if (sfc != NULL) {
+ int64_t acked_stream_window =
+ sfc->announced_window_delta + acked_init_window;
+ int64_t sent_stream_window = sfc->announced_window_delta + sent_init_window;
+ if (incoming_frame_size > acked_stream_window) {
+ if (incoming_frame_size <= sent_stream_window) {
+ gpr_log(
+ GPR_ERROR,
+ "Incoming frame of size %" PRId64
+ " exceeds local window size of %" PRId64
+ ".\n"
+ "The (un-acked, future) window size would be %" PRId64
+ " which is not exceeded.\n"
+ "This would usually cause a disconnection, but allowing it due to"
+ "broken HTTP2 implementations in the wild.\n"
+ "See (for example) https://github.com/netty/netty/issues/6520.",
+ incoming_frame_size, acked_stream_window, sent_stream_window);
+ } else {
+ char* msg;
+ gpr_asprintf(&msg, "frame of size %" PRId64
+ " overflows local window of %" PRId64,
+ incoming_frame_size, acked_stream_window);
+ grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
+ gpr_free(msg);
+ return err;
+ }
+ }
+
+ announced_window_delta_preupdate(tfc, sfc);
+ sfc->announced_window_delta -= incoming_frame_size;
+ announced_window_delta_postupdate(tfc, sfc);
+ sfc->local_window_delta -= incoming_frame_size;
+ }
+
+ tfc->announced_window -= incoming_frame_size;
+
+ POSTTRACE(tfc, sfc, " data recv");
+ return GRPC_ERROR_NONE;
+}
+
+// Returns a non zero announce integer if we should send a transport window
+// update
+uint32_t grpc_chttp2_flowctl_maybe_send_transport_update(
+ grpc_chttp2_transport_flowctl* tfc) {
+ PRETRACE(tfc, NULL);
+ uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc);
+ uint32_t threshold_to_send_transport_window_update =
+ tfc->t->outbuf.count > 0 ? 3 * target_announced_window / 4
+ : target_announced_window / 2;
+ if (tfc->announced_window <= threshold_to_send_transport_window_update &&
+ tfc->announced_window != target_announced_window) {
+ uint32_t announce = (uint32_t)GPR_CLAMP(
+ target_announced_window - tfc->announced_window, 0, UINT32_MAX);
+ tfc->announced_window += announce;
+ POSTTRACE(tfc, NULL, "t updt sent");
+ return announce;
+ }
+ GRPC_FLOW_CONTROL_IF_TRACING(
+ gpr_log(GPR_DEBUG, "%p[0][%s] will not send transport update", tfc,
+ tfc->t->is_client ? "cli" : "svr"));
+ return 0;
+}
+
+// Returns a non zero announce integer if we should send a stream window update
+uint32_t grpc_chttp2_flowctl_maybe_send_stream_update(
+ grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) {
+ PRETRACE(tfc, sfc);
+ if (sfc->local_window_delta > sfc->announced_window_delta) {
+ uint32_t announce = (uint32_t)GPR_CLAMP(
+ sfc->local_window_delta - sfc->announced_window_delta, 0, UINT32_MAX);
+ announced_window_delta_preupdate(tfc, sfc);
+ sfc->announced_window_delta += announce;
+ announced_window_delta_postupdate(tfc, sfc);
+ POSTTRACE(tfc, sfc, "s updt sent");
+ return announce;
+ }
+ GRPC_FLOW_CONTROL_IF_TRACING(
+ gpr_log(GPR_DEBUG, "%p[%u][%s] will not send stream update", tfc,
+ sfc->s->id, tfc->t->is_client ? "cli" : "svr"));
+ return 0;
+}
+
+// we have received a WINDOW_UPDATE frame for a transport
+void grpc_chttp2_flowctl_recv_transport_update(
+ grpc_chttp2_transport_flowctl* tfc, uint32_t size) {
+ PRETRACE(tfc, NULL);
+ tfc->remote_window += size;
+ POSTTRACE(tfc, NULL, "t updt recv");
+}
+
+// we have received a WINDOW_UPDATE frame for a stream
+void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl* tfc,
+ grpc_chttp2_stream_flowctl* sfc,
+ uint32_t size) {
+ PRETRACE(tfc, sfc);
+ sfc->remote_window_delta += size;
+ POSTTRACE(tfc, sfc, "s updt recv");
+}
+
+void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl* tfc,
+ grpc_chttp2_stream_flowctl* sfc,
+ size_t max_size_hint,
+ size_t have_already) {
+ PRETRACE(tfc, sfc);
+ uint32_t max_recv_bytes;
+ uint32_t sent_init_window =
+ tfc->t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+
+ /* clamp max recv hint to an allowable size */
+ if (max_size_hint >= UINT32_MAX - sent_init_window) {
+ max_recv_bytes = UINT32_MAX - sent_init_window;
+ } else {
+ max_recv_bytes = (uint32_t)max_size_hint;
+ }
+
+ /* account for bytes already received but unknown to higher layers */
+ if (max_recv_bytes >= have_already) {
+ max_recv_bytes -= (uint32_t)have_already;
+ } else {
+ max_recv_bytes = 0;
+ }
+
+ /* add some small lookahead to keep pipelines flowing */
+ GPR_ASSERT(max_recv_bytes <= UINT32_MAX - sent_init_window);
+ if (sfc->local_window_delta < max_recv_bytes) {
+ uint32_t add_max_recv_bytes =
+ (uint32_t)(max_recv_bytes - sfc->local_window_delta);
+ sfc->local_window_delta += add_max_recv_bytes;
+ }
+ POSTTRACE(tfc, sfc, "app st recv");
+}
+
+void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl* tfc,
+ grpc_chttp2_stream_flowctl* sfc) {
+ announced_window_delta_preupdate(tfc, sfc);
+}
+
+// Returns an urgency with which to make an update
+static grpc_chttp2_flowctl_urgency delta_is_significant(
+ const grpc_chttp2_transport_flowctl* tfc, int32_t value,
+ grpc_chttp2_setting_id setting_id) {
+ int64_t delta = (int64_t)value -
+ (int64_t)tfc->t->settings[GRPC_LOCAL_SETTINGS][setting_id];
+ // TODO(ncteisen): tune this
+ if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) {
+ return GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE;
+ } else {
+ return GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED;
+ }
+}
+
+// Takes in a target and uses the pid controller to return a stabilized
+// guess at the new bdp.
+static double get_pid_controller_guess(grpc_chttp2_transport_flowctl* tfc,
+ double target) {
+ double bdp_error = target - grpc_pid_controller_last(&tfc->pid_controller);
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ gpr_timespec dt_timespec = gpr_time_sub(now, tfc->last_pid_update);
+ double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9;
+ if (dt > 0.1) {
+ dt = 0.1;
+ }
+ double log2_bdp_guess =
+ grpc_pid_controller_update(&tfc->pid_controller, bdp_error, dt);
+ tfc->last_pid_update = now;
+ return pow(2, log2_bdp_guess);
+}
+
+// Take in a target and modifies it based on the memory pressure of the system
+static double get_target_under_memory_pressure(
+ grpc_chttp2_transport_flowctl* tfc, double target) {
+ // do not increase window under heavy memory pressure.
+ double memory_pressure = grpc_resource_quota_get_memory_pressure(
+ grpc_resource_user_quota(grpc_endpoint_get_resource_user(tfc->t->ep)));
+ if (memory_pressure > 0.8) {
+ target *= 1 - GPR_MIN(1, (memory_pressure - 0.8) / 0.1);
+ }
+ return target;
+}
+
+grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action(
+ grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) {
+ grpc_chttp2_flowctl_action action;
+ memset(&action, 0, sizeof(action));
+ uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc);
+ if (tfc->announced_window < target_announced_window / 2) {
+ action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY;
+ }
+ // TODO(ncteisen): tune this
+ if (sfc != NULL && !sfc->s->read_closed) {
+ uint32_t sent_init_window =
+ tfc->t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ if ((int64_t)sfc->local_window_delta >
+ (int64_t)sfc->announced_window_delta &&
+ (int64_t)sfc->announced_window_delta + sent_init_window <=
+ sent_init_window / 2) {
+ action.send_stream_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY;
+ } else if (sfc->local_window_delta > sfc->announced_window_delta) {
+ action.send_stream_update = GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE;
+ }
+ }
+ TRACEACTION(tfc, action);
+ return action;
+}
+
+grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_bdp_action(
+ grpc_chttp2_transport_flowctl* tfc) {
+ grpc_chttp2_flowctl_action action;
+ memset(&action, 0, sizeof(action));
+ if (tfc->enable_bdp_probe) {
+ action.need_ping = grpc_bdp_estimator_need_ping(&tfc->bdp_estimator);
+
+ // get bdp estimate and update initial_window accordingly.
+ int64_t estimate = -1;
+ int32_t bdp = -1;
+ if (grpc_bdp_estimator_get_estimate(&tfc->bdp_estimator, &estimate)) {
+ double target = 1 + log2((double)estimate);
+
+ // target might change based on how much memory pressure we are under
+ // TODO(ncteisen): experiment with setting target to be huge under low
+ // memory pressure.
+ target = get_target_under_memory_pressure(tfc, target);
+
+ // run our target through the pid controller to stabilize change.
+ // TODO(ncteisen): experiment with other controllers here.
+ double bdp_guess = get_pid_controller_guess(tfc, target);
+
+ // Though initial window 'could' drop to 0, we keep the floor at 128
+ bdp = GPR_MAX((int32_t)bdp_guess, 128);
+
+ grpc_chttp2_flowctl_urgency init_window_update_urgency =
+ delta_is_significant(tfc, bdp,
+ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE);
+ if (init_window_update_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
+ action.send_setting_update = init_window_update_urgency;
+ action.initial_window_size = (uint32_t)bdp;
+ }
+ }
+
+ // get bandwidth estimate and update max_frame accordingly.
+ double bw_dbl = -1;
+ if (grpc_bdp_estimator_get_bw(&tfc->bdp_estimator, &bw_dbl)) {
+ // we target the max of BDP or bandwidth in microseconds.
+ int32_t frame_size = (int32_t)GPR_CLAMP(
+ GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000, bdp), 16384,
+ 16777215);
+ grpc_chttp2_flowctl_urgency frame_size_urgency = delta_is_significant(
+ tfc, frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE);
+ if (frame_size_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) {
+ if (frame_size_urgency > action.send_setting_update) {
+ action.send_setting_update = frame_size_urgency;
+ }
+ action.max_frame_size = (uint32_t)frame_size;
+ }
+ }
+ }
+
+ TRACEACTION(tfc, action);
+ return action;
+}