diff options
Diffstat (limited to 'src/core/ext/transport/chttp2/transport/flow_control.cc')
-rw-r--r-- | src/core/ext/transport/chttp2/transport/flow_control.cc | 576 |
1 files changed, 221 insertions, 355 deletions
diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc index 639e51da70..40545bc74b 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.cc +++ b/src/core/ext/transport/chttp2/transport/flow_control.cc @@ -16,7 +16,7 @@ * */ -#include "src/core/ext/transport/chttp2/transport/internal.h" +#include "src/core/ext/transport/chttp2/transport/flow_control.h" #include <inttypes.h> #include <limits.h> @@ -28,38 +28,15 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> +#include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/lib/support/string.h" -static uint32_t grpc_chttp2_target_announced_window( - const grpc_chttp2_transport_flowctl* tfc); - -#ifndef NDEBUG - -typedef struct { - int64_t remote_window; - int64_t target_window; - int64_t announced_window; - int64_t remote_window_delta; - int64_t local_window_delta; - int64_t announced_window_delta; - uint32_t local_init_window; - uint32_t local_max_frame; -} shadow_flow_control; - -static void pretrace(shadow_flow_control* shadow_fc, - grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc) { - shadow_fc->remote_window = tfc->remote_window; - shadow_fc->target_window = grpc_chttp2_target_announced_window(tfc); - shadow_fc->announced_window = tfc->announced_window; - if (sfc != NULL) { - shadow_fc->remote_window_delta = sfc->remote_window_delta; - shadow_fc->local_window_delta = sfc->local_window_delta; - shadow_fc->announced_window_delta = sfc->announced_window_delta; - } -} +namespace grpc_core { +namespace chttp2 { + +namespace { -#define TRACE_PADDING 30 +static constexpr const int kTracePadding = 30; static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) { char* str; @@ -68,7 +45,7 @@ static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) { } else { gpr_asprintf(&str, "%" PRId64 "", old_val); } - char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING); + char* str_lp = gpr_leftpad(str, ' ', kTracePadding); gpr_free(str); return str_lp; } @@ -80,47 +57,58 @@ static char* fmt_uint32_diff_str(uint32_t old_val, uint32_t new_val) { } else { gpr_asprintf(&str, "%" PRIu32 "", old_val); } - char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING); + char* str_lp = gpr_leftpad(str, ' ', kTracePadding); gpr_free(str); return str_lp; } +} // namespace + +void FlowControlTrace::Init(const char* reason, TransportFlowControl* tfc, + StreamFlowControl* sfc) { + tfc_ = tfc; + sfc_ = sfc; + reason_ = reason; + remote_window_ = tfc->remote_window(); + target_window_ = tfc->target_window(); + announced_window_ = tfc->announced_window(); + if (sfc != nullptr) { + remote_window_delta_ = sfc->remote_window_delta(); + local_window_delta_ = sfc->local_window_delta(); + announced_window_delta_ = sfc->announced_window_delta(); + } +} -static void posttrace(shadow_flow_control* shadow_fc, - grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc, const char* reason) { +void FlowControlTrace::Finish() { uint32_t acked_local_window = - tfc->t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + tfc_->transport()->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; uint32_t remote_window = - tfc->t->settings[GRPC_PEER_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - char* trw_str = - fmt_int64_diff_str(shadow_fc->remote_window, tfc->remote_window); - char* tlw_str = fmt_int64_diff_str(shadow_fc->target_window, - grpc_chttp2_target_announced_window(tfc)); + tfc_->transport()->settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + char* trw_str = fmt_int64_diff_str(remote_window_, tfc_->remote_window()); + char* tlw_str = fmt_int64_diff_str(target_window_, tfc_->target_window()); char* taw_str = - fmt_int64_diff_str(shadow_fc->announced_window, tfc->announced_window); + fmt_int64_diff_str(announced_window_, tfc_->announced_window()); char* srw_str; char* slw_str; char* saw_str; - if (sfc != NULL) { - srw_str = fmt_int64_diff_str(shadow_fc->remote_window_delta + remote_window, - sfc->remote_window_delta + remote_window); - slw_str = - fmt_int64_diff_str(shadow_fc->local_window_delta + acked_local_window, - sfc->local_window_delta + acked_local_window); - saw_str = fmt_int64_diff_str( - shadow_fc->announced_window_delta + acked_local_window, - sfc->announced_window_delta + acked_local_window); + if (sfc_ != nullptr) { + srw_str = fmt_int64_diff_str(remote_window_delta_ + remote_window, + sfc_->remote_window_delta() + remote_window); + slw_str = fmt_int64_diff_str(local_window_delta_ + acked_local_window, + local_window_delta_ + acked_local_window); + saw_str = fmt_int64_diff_str(announced_window_delta_ + acked_local_window, + announced_window_delta_ + acked_local_window); } else { - srw_str = gpr_leftpad("", ' ', TRACE_PADDING); - slw_str = gpr_leftpad("", ' ', TRACE_PADDING); - saw_str = gpr_leftpad("", ' ', TRACE_PADDING); + srw_str = gpr_leftpad("", ' ', kTracePadding); + slw_str = gpr_leftpad("", ' ', kTracePadding); + saw_str = gpr_leftpad("", ' ', kTracePadding); } gpr_log(GPR_DEBUG, "%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s", - tfc, sfc != NULL ? sfc->s->id : 0, tfc->t->is_client ? "cli" : "svr", - reason, trw_str, tlw_str, taw_str, srw_str, slw_str, saw_str); + tfc_, sfc_ != nullptr ? sfc_->stream()->id : 0, + tfc_->transport()->is_client ? "cli" : "svr", reason_, trw_str, + tlw_str, taw_str, srw_str, slw_str, saw_str); gpr_free(trw_str); gpr_free(tlw_str); gpr_free(taw_str); @@ -129,13 +117,13 @@ static void posttrace(shadow_flow_control* shadow_fc, gpr_free(saw_str); } -static const char* urgency_to_string(grpc_chttp2_flowctl_urgency urgency) { - switch (urgency) { - case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: +const char* FlowControlAction::UrgencyString(Urgency u) { + switch (u) { + case Urgency::NO_ACTION_NEEDED: return "no action"; - case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: + case Urgency::UPDATE_IMMEDIATELY: return "update immediately"; - case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: + case Urgency::QUEUE_UPDATE: return "queue update"; default: GPR_UNREACHABLE_CODE(return "unknown"); @@ -143,209 +131,132 @@ static const char* urgency_to_string(grpc_chttp2_flowctl_urgency urgency) { GPR_UNREACHABLE_CODE(return "unknown"); } -static void trace_action(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_flowctl_action action) { +void FlowControlAction::Trace(grpc_chttp2_transport* t) const { char* iw_str = fmt_uint32_diff_str( - tfc->t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], - action.initial_window_size); + t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + initial_window_size_); char* mf_str = fmt_uint32_diff_str( - tfc->t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], - action.max_frame_size); - gpr_log(GPR_DEBUG, "t[%s], s[%s], settings[%s] iw:%s mf:%s", - urgency_to_string(action.send_transport_update), - urgency_to_string(action.send_stream_update), - urgency_to_string(action.send_setting_update), iw_str, mf_str); + t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], + max_frame_size_); + gpr_log(GPR_DEBUG, "t[%s], s[%s], iw:%s:%s mf:%s:%s", + UrgencyString(send_transport_update_), + UrgencyString(send_stream_update_), + UrgencyString(send_initial_window_update_), iw_str, + UrgencyString(send_max_frame_size_update_), mf_str); gpr_free(iw_str); gpr_free(mf_str); } -#define PRETRACE(tfc, sfc) \ - shadow_flow_control shadow_fc; \ - GRPC_FLOW_CONTROL_IF_TRACING(pretrace(&shadow_fc, tfc, sfc)) -#define POSTTRACE(tfc, sfc, reason) \ - GRPC_FLOW_CONTROL_IF_TRACING(posttrace(&shadow_fc, tfc, sfc, reason)) -#define TRACEACTION(tfc, action) \ - GRPC_FLOW_CONTROL_IF_TRACING(trace_action(tfc, action)) -#else -#define PRETRACE(tfc, sfc) -#define POSTTRACE(tfc, sfc, reason) -#define TRACEACTION(tfc, action) -#endif - -/* How many bytes of incoming flow control would we like to advertise */ -static uint32_t grpc_chttp2_target_announced_window( - const grpc_chttp2_transport_flowctl* tfc) { - return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1), - tfc->announced_stream_total_over_incoming_window + - tfc->target_initial_window_size); -} - -// we have sent data on the wire, we must track this in our bookkeeping for the -// remote peer's flow control. -void grpc_chttp2_flowctl_sent_data(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc, - int64_t size) { - PRETRACE(tfc, sfc); - tfc->remote_window -= size; - sfc->remote_window_delta -= size; - POSTTRACE(tfc, sfc, " data sent"); -} - -static void announced_window_delta_preupdate(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc) { - if (sfc->announced_window_delta > 0) { - tfc->announced_stream_total_over_incoming_window -= - sfc->announced_window_delta; - } else { - tfc->announced_stream_total_under_incoming_window += - -sfc->announced_window_delta; - } -} - -static void announced_window_delta_postupdate( - grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) { - if (sfc->announced_window_delta > 0) { - tfc->announced_stream_total_over_incoming_window += - sfc->announced_window_delta; - } else { - tfc->announced_stream_total_under_incoming_window -= - -sfc->announced_window_delta; +TransportFlowControl::TransportFlowControl(grpc_exec_ctx* exec_ctx, + const grpc_chttp2_transport* t, + bool enable_bdp_probe) + : t_(t), + enable_bdp_probe_(enable_bdp_probe), + bdp_estimator_(t->peer_string), + pid_controller_(grpc_core::PidController::Args() + .set_gain_p(4) + .set_gain_i(8) + .set_gain_d(0) + .set_initial_control_value(TargetLogBdp()) + .set_min_control_value(-1) + .set_max_control_value(25) + .set_integral_range(10)), + last_pid_update_(grpc_exec_ctx_now(exec_ctx)) {} + +uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) { + FlowControlTrace trace("t updt sent", this, nullptr); + const uint32_t target_announced_window = (const uint32_t)target_window(); + if ((writing_anyway || announced_window_ <= target_announced_window / 2) && + announced_window_ != target_announced_window) { + const uint32_t announce = (uint32_t)GPR_CLAMP( + target_announced_window - announced_window_, 0, UINT32_MAX); + announced_window_ += announce; + return announce; } + return 0; } -// We have received data from the wire. We must track this in our own flow -// control bookkeeping. -// Returns an error if the incoming frame violates our flow control. -grpc_error* grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc, - int64_t incoming_frame_size) { - uint32_t sent_init_window = - tfc->t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - uint32_t acked_init_window = - tfc->t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - PRETRACE(tfc, sfc); - if (incoming_frame_size > tfc->announced_window) { +grpc_error* TransportFlowControl::ValidateRecvData( + int64_t incoming_frame_size) { + if (incoming_frame_size > announced_window_) { char* msg; gpr_asprintf(&msg, "frame of size %" PRId64 " overflows local window of %" PRId64, - incoming_frame_size, tfc->announced_window); + incoming_frame_size, announced_window_); grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); return err; } + return GRPC_ERROR_NONE; +} - if (sfc != NULL) { - int64_t acked_stream_window = - sfc->announced_window_delta + acked_init_window; - int64_t sent_stream_window = sfc->announced_window_delta + sent_init_window; - if (incoming_frame_size > acked_stream_window) { - if (incoming_frame_size <= sent_stream_window) { - gpr_log( - GPR_ERROR, - "Incoming frame of size %" PRId64 - " exceeds local window size of %" PRId64 - ".\n" - "The (un-acked, future) window size would be %" PRId64 - " which is not exceeded.\n" - "This would usually cause a disconnection, but allowing it due to" - "broken HTTP2 implementations in the wild.\n" - "See (for example) https://github.com/netty/netty/issues/6520.", - incoming_frame_size, acked_stream_window, sent_stream_window); - } else { - char* msg; - gpr_asprintf(&msg, "frame of size %" PRId64 - " overflows local window of %" PRId64, - incoming_frame_size, acked_stream_window); - grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); - gpr_free(msg); - return err; - } - } - - announced_window_delta_preupdate(tfc, sfc); - sfc->announced_window_delta -= incoming_frame_size; - announced_window_delta_postupdate(tfc, sfc); - sfc->local_window_delta -= incoming_frame_size; - } +StreamFlowControl::StreamFlowControl(TransportFlowControl* tfc, + const grpc_chttp2_stream* s) + : tfc_(tfc), s_(s) {} - tfc->announced_window -= incoming_frame_size; +grpc_error* StreamFlowControl::RecvData(int64_t incoming_frame_size) { + FlowControlTrace trace(" data recv", tfc_, this); - POSTTRACE(tfc, sfc, " data recv"); - return GRPC_ERROR_NONE; -} + grpc_error* error = GRPC_ERROR_NONE; + error = tfc_->ValidateRecvData(incoming_frame_size); + if (error != GRPC_ERROR_NONE) return error; -// Returns a non zero announce integer if we should send a transport window -// update -uint32_t grpc_chttp2_flowctl_maybe_send_transport_update( - grpc_chttp2_transport_flowctl* tfc, bool writing_anyway) { - PRETRACE(tfc, NULL); - uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc); - uint32_t threshold_to_send_transport_window_update = - tfc->t->outbuf.count > 0 ? 3 * target_announced_window / 4 - : target_announced_window / 2; - if ((writing_anyway || - tfc->announced_window <= threshold_to_send_transport_window_update) && - tfc->announced_window != target_announced_window) { - uint32_t announce = (uint32_t)GPR_CLAMP( - target_announced_window - tfc->announced_window, 0, UINT32_MAX); - tfc->announced_window += announce; - POSTTRACE(tfc, NULL, "t updt sent"); - return announce; + uint32_t sent_init_window = + tfc_->transport()->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + uint32_t acked_init_window = + tfc_->transport()->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + + int64_t acked_stream_window = announced_window_delta_ + acked_init_window; + int64_t sent_stream_window = announced_window_delta_ + sent_init_window; + if (incoming_frame_size > acked_stream_window) { + if (incoming_frame_size <= sent_stream_window) { + gpr_log(GPR_ERROR, + "Incoming frame of size %" PRId64 + " exceeds local window size of %" PRId64 + ".\n" + "The (un-acked, future) window size would be %" PRId64 + " which is not exceeded.\n" + "This would usually cause a disconnection, but allowing it due to" + "broken HTTP2 implementations in the wild.\n" + "See (for example) https://github.com/netty/netty/issues/6520.", + incoming_frame_size, acked_stream_window, sent_stream_window); + } else { + char* msg; + gpr_asprintf(&msg, "frame of size %" PRId64 + " overflows local window of %" PRId64, + incoming_frame_size, acked_stream_window); + grpc_error* err = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); + gpr_free(msg); + return err; + } } - GRPC_FLOW_CONTROL_IF_TRACING( - gpr_log(GPR_DEBUG, "%p[0][%s] will not send transport update", tfc, - tfc->t->is_client ? "cli" : "svr")); - return 0; + + UpdateAnnouncedWindowDelta(tfc_, -incoming_frame_size); + local_window_delta_ -= incoming_frame_size; + tfc_->CommitRecvData(incoming_frame_size); + return GRPC_ERROR_NONE; } -// Returns a non zero announce integer if we should send a stream window update -uint32_t grpc_chttp2_flowctl_maybe_send_stream_update( - grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) { - PRETRACE(tfc, sfc); - if (sfc->local_window_delta > sfc->announced_window_delta) { +uint32_t StreamFlowControl::MaybeSendUpdate() { + FlowControlTrace trace("s updt sent", tfc_, this); + if (local_window_delta_ > announced_window_delta_) { uint32_t announce = (uint32_t)GPR_CLAMP( - sfc->local_window_delta - sfc->announced_window_delta, 0, UINT32_MAX); - announced_window_delta_preupdate(tfc, sfc); - sfc->announced_window_delta += announce; - announced_window_delta_postupdate(tfc, sfc); - POSTTRACE(tfc, sfc, "s updt sent"); + local_window_delta_ - announced_window_delta_, 0, UINT32_MAX); + UpdateAnnouncedWindowDelta(tfc_, announce); return announce; } - GRPC_FLOW_CONTROL_IF_TRACING( - gpr_log(GPR_DEBUG, "%p[%u][%s] will not send stream update", tfc, - sfc->s->id, tfc->t->is_client ? "cli" : "svr")); return 0; } -// we have received a WINDOW_UPDATE frame for a transport -void grpc_chttp2_flowctl_recv_transport_update( - grpc_chttp2_transport_flowctl* tfc, uint32_t size) { - PRETRACE(tfc, NULL); - tfc->remote_window += size; - POSTTRACE(tfc, NULL, "t updt recv"); -} - -// we have received a WINDOW_UPDATE frame for a stream -void grpc_chttp2_flowctl_recv_stream_update(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc, - uint32_t size) { - PRETRACE(tfc, sfc); - sfc->remote_window_delta += size; - POSTTRACE(tfc, sfc, "s updt recv"); -} - -void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc, - size_t max_size_hint, - size_t have_already) { - PRETRACE(tfc, sfc); +void StreamFlowControl::IncomingByteStreamUpdate(size_t max_size_hint, + size_t have_already) { + FlowControlTrace trace("app st recv", tfc_, this); uint32_t max_recv_bytes; uint32_t sent_init_window = - tfc->t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + tfc_->transport()->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; /* clamp max recv hint to an allowable size */ if (max_size_hint >= UINT32_MAX - sent_init_window) { @@ -363,67 +274,18 @@ void grpc_chttp2_flowctl_incoming_bs_update(grpc_chttp2_transport_flowctl* tfc, /* add some small lookahead to keep pipelines flowing */ GPR_ASSERT(max_recv_bytes <= UINT32_MAX - sent_init_window); - if (sfc->local_window_delta < max_recv_bytes) { + if (local_window_delta_ < max_recv_bytes) { uint32_t add_max_recv_bytes = - (uint32_t)(max_recv_bytes - sfc->local_window_delta); - sfc->local_window_delta += add_max_recv_bytes; - } - POSTTRACE(tfc, sfc, "app st recv"); -} - -void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc) { - announced_window_delta_preupdate(tfc, sfc); -} - -// Returns an urgency with which to make an update -static grpc_chttp2_flowctl_urgency delta_is_significant( - const grpc_chttp2_transport_flowctl* tfc, int32_t value, - grpc_chttp2_setting_id setting_id) { - int64_t delta = (int64_t)value - - (int64_t)tfc->t->settings[GRPC_LOCAL_SETTINGS][setting_id]; - // TODO(ncteisen): tune this - if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) { - return GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE; - } else { - return GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED; + (uint32_t)(max_recv_bytes - local_window_delta_); + local_window_delta_ += add_max_recv_bytes; } } -// Takes in a target and uses the pid controller to return a stabilized -// guess at the new bdp. -static double get_pid_controller_guess(grpc_exec_ctx* exec_ctx, - grpc_chttp2_transport_flowctl* tfc, - double target) { - grpc_millis now = grpc_exec_ctx_now(exec_ctx); - if (!tfc->pid_controller_initialized) { - tfc->last_pid_update = now; - tfc->pid_controller_initialized = true; - grpc_pid_controller_init( - &tfc->pid_controller, - (grpc_pid_controller_args){.gain_p = 4, - .gain_i = 8, - .gain_d = 0, - .initial_control_value = target, - .min_control_value = -1, - .max_control_value = 25, - .integral_range = 10}); - return pow(2, target); - } - double bdp_error = target - grpc_pid_controller_last(&tfc->pid_controller); - double dt = (double)(now - tfc->last_pid_update) * 1e-3; - double log2_bdp_guess = - grpc_pid_controller_update(&tfc->pid_controller, bdp_error, dt); - tfc->last_pid_update = now; - return pow(2, log2_bdp_guess); -} - // Take in a target and modifies it based on the memory pressure of the system -static double get_target_under_memory_pressure( - grpc_chttp2_transport_flowctl* tfc, double target) { +static double AdjustForMemoryPressure(grpc_resource_quota* quota, + double target) { // do not increase window under heavy memory pressure. - double memory_pressure = grpc_resource_quota_get_memory_pressure( - grpc_resource_user_quota(grpc_endpoint_get_resource_user(tfc->t->ep))); + double memory_pressure = grpc_resource_quota_get_memory_pressure(quota); static const double kLowMemPressure = 0.1; static const double kZeroTarget = 22; static const double kHighMemPressure = 0.8; @@ -438,78 +300,82 @@ static double get_target_under_memory_pressure( return target; } -grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( - grpc_exec_ctx* exec_ctx, grpc_chttp2_transport_flowctl* tfc, - grpc_chttp2_stream_flowctl* sfc) { - grpc_chttp2_flowctl_action action; - memset(&action, 0, sizeof(action)); +double TransportFlowControl::TargetLogBdp() { + return AdjustForMemoryPressure( + grpc_resource_user_quota(grpc_endpoint_get_resource_user(t_->ep)), + 1 + log2(bdp_estimator_.EstimateBdp())); +} + +double TransportFlowControl::SmoothLogBdp(grpc_exec_ctx* exec_ctx, + double value) { + grpc_millis now = grpc_exec_ctx_now(exec_ctx); + double bdp_error = value - pid_controller_.last_control_value(); + const double dt = (double)(now - last_pid_update_) * 1e-3; + last_pid_update_ = now; + return pid_controller_.Update(bdp_error, dt); +} + +FlowControlAction::Urgency TransportFlowControl::DeltaUrgency( + int32_t value, grpc_chttp2_setting_id setting_id) { + int64_t delta = + (int64_t)value - (int64_t)t_->settings[GRPC_LOCAL_SETTINGS][setting_id]; // TODO(ncteisen): tune this - if (sfc != NULL && !sfc->s->read_closed) { - uint32_t sent_init_window = - tfc->t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - if ((int64_t)sfc->local_window_delta > - (int64_t)sfc->announced_window_delta && - (int64_t)sfc->announced_window_delta + sent_init_window <= - sent_init_window / 2) { - action.send_stream_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY; - } else if (sfc->local_window_delta > sfc->announced_window_delta) { - action.send_stream_update = GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE; - } + if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) { + return FlowControlAction::Urgency::QUEUE_UPDATE; + } else { + return FlowControlAction::Urgency::NO_ACTION_NEEDED; } - if (tfc->enable_bdp_probe) { - action.need_ping = - grpc_bdp_estimator_need_ping(exec_ctx, &tfc->bdp_estimator); +} +FlowControlAction TransportFlowControl::PeriodicUpdate( + grpc_exec_ctx* exec_ctx) { + FlowControlAction action; + if (enable_bdp_probe_) { // get bdp estimate and update initial_window accordingly. - int64_t estimate = -1; - if (grpc_bdp_estimator_get_estimate(&tfc->bdp_estimator, &estimate)) { - double target = 1 + log2((double)estimate); - - // target might change based on how much memory pressure we are under - // TODO(ncteisen): experiment with setting target to be huge under low - // memory pressure. - target = get_target_under_memory_pressure(tfc, target); - - // run our target through the pid controller to stabilize change. - // TODO(ncteisen): experiment with other controllers here. - double bdp_guess = get_pid_controller_guess(exec_ctx, tfc, target); - - // Though initial window 'could' drop to 0, we keep the floor at 128 - tfc->target_initial_window_size = - (int32_t)GPR_CLAMP(bdp_guess, 128, INT32_MAX); - - grpc_chttp2_flowctl_urgency init_window_update_urgency = - delta_is_significant(tfc, tfc->target_initial_window_size, - GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE); - if (init_window_update_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) { - action.send_setting_update = init_window_update_urgency; - action.initial_window_size = (uint32_t)tfc->target_initial_window_size; - } - } + // target might change based on how much memory pressure we are under + // TODO(ncteisen): experiment with setting target to be huge under low + // memory pressure. + const double target = pow(2, SmoothLogBdp(exec_ctx, TargetLogBdp())); + + // Though initial window 'could' drop to 0, we keep the floor at 128 + target_initial_window_size_ = (int32_t)GPR_CLAMP(target, 128, INT32_MAX); + + action.set_send_initial_window_update( + DeltaUrgency(target_initial_window_size_, + GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE), + target_initial_window_size_); // get bandwidth estimate and update max_frame accordingly. - double bw_dbl = -1; - if (grpc_bdp_estimator_get_bw(&tfc->bdp_estimator, &bw_dbl)) { - // we target the max of BDP or bandwidth in microseconds. - int32_t frame_size = (int32_t)GPR_CLAMP( - GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000, - tfc->target_initial_window_size), - 16384, 16777215); - grpc_chttp2_flowctl_urgency frame_size_urgency = delta_is_significant( - tfc, frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE); - if (frame_size_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) { - if (frame_size_urgency > action.send_setting_update) { - action.send_setting_update = frame_size_urgency; - } - action.max_frame_size = (uint32_t)frame_size; - } - } + double bw_dbl = bdp_estimator_.EstimateBandwidth(); + // we target the max of BDP or bandwidth in microseconds. + int32_t frame_size = (int32_t)GPR_CLAMP( + GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000, + target_initial_window_size_), + 16384, 16777215); + action.set_send_max_frame_size_update( + DeltaUrgency(frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE), + frame_size); } - uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc); - if (tfc->announced_window < target_announced_window / 2) { - action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY; + return UpdateAction(action); +} + +FlowControlAction StreamFlowControl::UpdateAction(FlowControlAction action) { + // TODO(ncteisen): tune this + if (!s_->read_closed) { + uint32_t sent_init_window = + tfc_->transport()->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + if (local_window_delta_ > announced_window_delta_ && + announced_window_delta_ + sent_init_window <= sent_init_window / 2) { + action.set_send_stream_update( + FlowControlAction::Urgency::UPDATE_IMMEDIATELY); + } else if (local_window_delta_ > announced_window_delta_) { + action.set_send_stream_update(FlowControlAction::Urgency::QUEUE_UPDATE); + } } - TRACEACTION(tfc, action); + return action; } + +} // namespace chttp2 +} // namespace grpc_core |