aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport')
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc37
-rw-r--r--src/core/ext/transport/chttp2/transport/flow_control.cc19
-rw-r--r--src/core/ext/transport/chttp2/transport/flow_control.h240
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_settings.cc6
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h10
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.cc7
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_lists.cc2
7 files changed, 264 insertions, 57 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index 7c77de2168..5ee1f08e61 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -232,6 +232,9 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) { gpr_ref(&t->refs); }
static const grpc_transport_vtable* get_vtable(void);
+// -1 == unset, 0 == disabled, 1 == enabled
+static int flow_control_enabled = -1;
+
static void init_transport(grpc_chttp2_transport* t,
const grpc_channel_args* channel_args,
grpc_endpoint* ep, bool is_client) {
@@ -517,7 +520,23 @@ static void init_transport(grpc_chttp2_transport* t,
}
}
- t->flow_control.Init(t, enable_bdp);
+ if (flow_control_enabled == -1) {
+ char* env_variable = gpr_getenv("GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL");
+ if (env_variable != nullptr) {
+ flow_control_enabled = 0;
+ } else {
+ flow_control_enabled = 1;
+ }
+ gpr_free(env_variable);
+ }
+
+ if (flow_control_enabled) {
+ t->flow_control.Init<grpc_core::chttp2::TransportFlowControl>(t,
+ enable_bdp);
+ } else {
+ t->flow_control.Init<grpc_core::chttp2::TransportFlowControlDisabled>(t);
+ enable_bdp = false;
+ }
/* No pings allowed before receiving a header or data frame. */
t->ping_state.pings_before_data_required = 0;
@@ -682,7 +701,14 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs,
post_destructive_reclaimer(t);
}
- s->flow_control.Init(t->flow_control.get(), s);
+ if (t->flow_control->flow_control_enabled()) {
+ s->flow_control.Init<grpc_core::chttp2::StreamFlowControl>(
+ static_cast<grpc_core::chttp2::TransportFlowControl*>(
+ t->flow_control.get()),
+ s);
+ } else {
+ s->flow_control.Init<grpc_core::chttp2::StreamFlowControlDisabled>();
+ }
GPR_TIMER_END("init_stream", 0);
return 0;
@@ -2402,8 +2428,11 @@ static void read_action_locked(void* tp, grpc_error* error) {
grpc_error* errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
GRPC_ERROR_NONE};
for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
- t->flow_control->bdp_estimator()->AddIncomingBytes(
- (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]));
+ grpc_core::BdpEstimator* bdp_est = t->flow_control->bdp_estimator();
+ if (bdp_est) {
+ bdp_est->AddIncomingBytes(
+ (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]));
+ }
errors[1] = grpc_chttp2_perform_read(t, t->read_buffer.slices[i]);
}
if (errors[1] != GRPC_ERROR_NONE) {
diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc
index ca48cc7e0a..b4864077bd 100644
--- a/src/core/ext/transport/chttp2/transport/flow_control.cc
+++ b/src/core/ext/transport/chttp2/transport/flow_control.cc
@@ -149,6 +149,25 @@ void FlowControlAction::Trace(grpc_chttp2_transport* t) const {
gpr_free(mf_str);
}
+TransportFlowControlDisabled::TransportFlowControlDisabled(
+ grpc_chttp2_transport* t) {
+ remote_window_ = kMaxWindow;
+ target_initial_window_size_ = kMaxWindow;
+ announced_window_ = kMaxWindow;
+ t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
+ kFrameSize;
+ t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
+ kFrameSize;
+ t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
+ kFrameSize;
+ t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
+ kMaxWindow;
+ t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
+ kMaxWindow;
+ t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
+ kMaxWindow;
+}
+
TransportFlowControl::TransportFlowControl(const grpc_chttp2_transport* t,
bool enable_bdp_probe)
: t_(t),
diff --git a/src/core/ext/transport/chttp2/transport/flow_control.h b/src/core/ext/transport/chttp2/transport/flow_control.h
index 8306047dbc..789f755c38 100644
--- a/src/core/ext/transport/chttp2/transport/flow_control.h
+++ b/src/core/ext/transport/chttp2/transport/flow_control.h
@@ -24,6 +24,7 @@
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/http2_settings.h"
+#include "src/core/lib/support/abstract.h"
#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/pid_controller.h"
@@ -43,10 +44,16 @@ namespace grpc_core {
namespace chttp2 {
static constexpr uint32_t kDefaultWindow = 65535;
+static constexpr int64_t kMaxWindow = (int64_t)((1u << 31) - 1);
+// TODO(ncteisen): Tune this
+static constexpr uint32_t kFrameSize = 1024 * 1024;
class TransportFlowControl;
class StreamFlowControl;
+// Encapsulates a collections of actions the transport needs to take with
+// regard to flow control. Each action comes with urgencies that tell the
+// transport how quickly the action must take place.
class FlowControlAction {
public:
enum class Urgency : uint8_t {
@@ -132,36 +139,122 @@ class FlowControlTrace {
int64_t announced_window_delta_;
};
-class TransportFlowControl {
+// Fat interface with all methods a flow control implementation needs to
+// support. gRPC C Core does not support pure virtual functions, so instead
+// we abort in any methods which require implementation in the base class.
+class TransportFlowControlBase {
+ public:
+ TransportFlowControlBase() {}
+ virtual ~TransportFlowControlBase() {}
+
+ // Is flow control enabled? This is needed in other codepaths like the checks
+ // in parsing and in writing.
+ virtual bool flow_control_enabled() const { abort(); }
+
+ // Called to check if the transport needs to send a WINDOW_UPDATE frame
+ virtual uint32_t MaybeSendUpdate(bool writing_anyway) { abort(); }
+
+ // Using the protected members, returns and Action to be taken by the
+ // tranport.
+ virtual FlowControlAction MakeAction() { abort(); }
+
+ // Using the protected members, returns and Action to be taken by the
+ // tranport. Also checks for updates to our BDP estimate and acts
+ // accordingly.
+ virtual FlowControlAction PeriodicUpdate() { abort(); }
+
+ // Called to do bookkeeping when a stream owned by this transport sends
+ // data on the wire
+ virtual void StreamSentData(int64_t size) { abort(); }
+
+ // Called to do bookkeeping when a stream owned by this transport receives
+ // data from the wire. Also does error checking for frame size.
+ virtual grpc_error* RecvData(int64_t incoming_frame_size) { abort(); }
+
+ // Called to do bookkeeping when we receive a WINDOW_UPDATE frame.
+ virtual void RecvUpdate(uint32_t size) { abort(); }
+
+ // Returns the BdpEstimator held by this object. Caller is responsible for
+ // checking for nullptr. TODO(ncteisen): consider fully encapsulating all
+ // bdp estimator actions inside TransportFlowControl
+ virtual BdpEstimator* bdp_estimator() { return nullptr; }
+
+ // Getters
+ int64_t remote_window() const { return remote_window_; }
+ virtual int64_t target_window() const { return target_initial_window_size_; }
+ int64_t announced_window() const { return announced_window_; }
+
+ // Used in certain benchmarks in which we don't want FlowControl to be a
+ // factor
+ virtual void TestOnlyForceHugeWindow() {}
+
+ GRPC_ABSTRACT_BASE_CLASS
+
+ protected:
+ friend class ::grpc::testing::TrickledCHTTP2;
+ int64_t remote_window_ = kDefaultWindow;
+ int64_t target_initial_window_size_ = kDefaultWindow;
+ int64_t announced_window_ = kDefaultWindow;
+};
+
+// Implementation of flow control that does NOTHING. Always returns maximum
+// values, never initiates writes, and assumes that the remote peer is doing
+// the same. To be used to narrow down on flow control as the cause of negative
+// performance.
+class TransportFlowControlDisabled final : public TransportFlowControlBase {
+ public:
+ // Maxes out all values
+ TransportFlowControlDisabled(grpc_chttp2_transport* t);
+
+ bool flow_control_enabled() const override { return false; }
+
+ // Never do anything.
+ uint32_t MaybeSendUpdate(bool writing_anyway) override { return 0; }
+ FlowControlAction MakeAction() override { return FlowControlAction(); }
+ FlowControlAction PeriodicUpdate() override { return FlowControlAction(); }
+ void StreamSentData(int64_t size) override {}
+ grpc_error* RecvData(int64_t incoming_frame_size) override {
+ return GRPC_ERROR_NONE;
+ }
+ void RecvUpdate(uint32_t size) override {}
+};
+
+// Implementation of flow control that abides to HTTP/2 spec and attempts
+// to be as performant as possible.
+class TransportFlowControl final : public TransportFlowControlBase {
public:
TransportFlowControl(const grpc_chttp2_transport* t, bool enable_bdp_probe);
~TransportFlowControl() {}
+ bool flow_control_enabled() const override { return true; }
+
bool bdp_probe() const { return enable_bdp_probe_; }
// returns an announce if we should send a transport update to our peer,
// else returns zero; writing_anyway indicates if a write would happen
// regardless of the send - if it is false and this function returns non-zero,
// this announce will cause a write to occur
- uint32_t MaybeSendUpdate(bool writing_anyway);
+ uint32_t MaybeSendUpdate(bool writing_anyway) override;
// Reads the flow control data and returns and actionable struct that will
// tell chttp2 exactly what it needs to do
- FlowControlAction MakeAction() { return UpdateAction(FlowControlAction()); }
+ FlowControlAction MakeAction() override {
+ return UpdateAction(FlowControlAction());
+ }
// Call periodically (at a low-ish rate, 100ms - 10s makes sense)
// to perform more complex flow control calculations and return an action
// to let chttp2 change its parameters
- FlowControlAction PeriodicUpdate();
+ FlowControlAction PeriodicUpdate() override;
- void StreamSentData(int64_t size) { remote_window_ -= size; }
+ void StreamSentData(int64_t size) override { remote_window_ -= size; }
grpc_error* ValidateRecvData(int64_t incoming_frame_size);
void CommitRecvData(int64_t incoming_frame_size) {
announced_window_ -= incoming_frame_size;
}
- grpc_error* RecvData(int64_t incoming_frame_size) {
+ grpc_error* RecvData(int64_t incoming_frame_size) override {
FlowControlTrace trace(" data recv", this, nullptr);
grpc_error* error = ValidateRecvData(incoming_frame_size);
if (error != GRPC_ERROR_NONE) return error;
@@ -170,18 +263,18 @@ class TransportFlowControl {
}
// we have received a WINDOW_UPDATE frame for a transport
- void RecvUpdate(uint32_t size) {
+ void RecvUpdate(uint32_t size) override {
FlowControlTrace trace("t updt recv", this, nullptr);
remote_window_ += size;
}
- int64_t remote_window() const { return remote_window_; }
- int64_t target_window() const {
+ // See comment above announced_stream_total_over_incoming_window_ for the
+ // logic behind this decision.
+ int64_t target_window() const override {
return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1),
announced_stream_total_over_incoming_window_ +
target_initial_window_size_);
}
- int64_t announced_window() const { return announced_window_; }
const grpc_chttp2_transport* transport() const { return t_; }
@@ -201,15 +294,14 @@ class TransportFlowControl {
}
}
- BdpEstimator* bdp_estimator() { return &bdp_estimator_; }
+ BdpEstimator* bdp_estimator() override { return &bdp_estimator_; }
- void TestOnlyForceHugeWindow() {
+ void TestOnlyForceHugeWindow() override {
announced_window_ = 1024 * 1024 * 1024;
remote_window_ = 1024 * 1024 * 1024;
}
private:
- friend class ::grpc::testing::TrickledCHTTP2;
double TargetLogBdp();
double SmoothLogBdp(double value);
FlowControlAction::Urgency DeltaUrgency(int32_t value,
@@ -225,9 +317,6 @@ class TransportFlowControl {
const grpc_chttp2_transport* const t_;
- /** Our bookkeeping for the remote peer's available window */
- int64_t remote_window_ = kDefaultWindow;
-
/** calculating what we should give for local window:
we track the total amount of flow control over initial window size
across all streams: this is data that we want to receive right now (it
@@ -239,13 +328,6 @@ class TransportFlowControl {
int64_t announced_stream_total_over_incoming_window_ = 0;
int64_t announced_stream_total_under_incoming_window_ = 0;
- /** This is out window according to what we have sent to our remote peer. The
- * difference between this and target window is what we use to decide when
- * to send WINDOW_UPDATE frames. */
- int64_t announced_window_ = kDefaultWindow;
-
- int32_t target_initial_window_size_ = kDefaultWindow;
-
/** should we probe bdp? */
const bool enable_bdp_probe_;
@@ -257,39 +339,117 @@ class TransportFlowControl {
grpc_millis last_pid_update_ = 0;
};
-class StreamFlowControl {
+// Fat interface with all methods a stream flow control implementation needs
+// to support. gRPC C Core does not support pure virtual functions, so instead
+// we abort in any methods which require implementation in the base class.
+class StreamFlowControlBase {
+ public:
+ StreamFlowControlBase() {}
+ virtual ~StreamFlowControlBase() {}
+
+ // Updates an action using the protected members.
+ virtual FlowControlAction UpdateAction(FlowControlAction action) { abort(); }
+
+ // Using the protected members, returns an Action for this stream to be
+ // taken by the tranport.
+ virtual FlowControlAction MakeAction() { abort(); }
+
+ // Bookkeeping for when data is sent on this stream.
+ virtual void SentData(int64_t outgoing_frame_size) { abort(); }
+
+ // Bookkeeping and error checking for when data is received by this stream.
+ virtual grpc_error* RecvData(int64_t incoming_frame_size) { abort(); }
+
+ // Called to check if this stream needs to send a WINDOW_UPDATE frame.
+ virtual uint32_t MaybeSendUpdate() { abort(); }
+
+ // Bookkeeping for receiving a WINDOW_UPDATE from for this stream.
+ virtual void RecvUpdate(uint32_t size) { abort(); }
+
+ // Bookkeeping for when a call pulls bytes out of the transport. At this
+ // point we consider the data 'used' and can thus let out peer know we are
+ // ready for more data.
+ virtual void IncomingByteStreamUpdate(size_t max_size_hint,
+ size_t have_already) {
+ abort();
+ }
+
+ // Used in certain benchmarks in which we don't want FlowControl to be a
+ // factor
+ virtual void TestOnlyForceHugeWindow() {}
+
+ // Getters
+ int64_t remote_window_delta() { return remote_window_delta_; }
+ int64_t local_window_delta() { return local_window_delta_; }
+ int64_t announced_window_delta() { return announced_window_delta_; }
+
+ GRPC_ABSTRACT_BASE_CLASS
+
+ protected:
+ friend class ::grpc::testing::TrickledCHTTP2;
+ int64_t remote_window_delta_ = 0;
+ int64_t local_window_delta_ = 0;
+ int64_t announced_window_delta_ = 0;
+};
+
+// Implementation of flow control that does NOTHING. Always returns maximum
+// values, never initiates writes, and assumes that the remote peer is doing
+// the same. To be used to narrow down on flow control as the cause of negative
+// performance.
+class StreamFlowControlDisabled : public StreamFlowControlBase {
+ public:
+ FlowControlAction UpdateAction(FlowControlAction action) override {
+ return action;
+ }
+ FlowControlAction MakeAction() override { return FlowControlAction(); }
+ void SentData(int64_t outgoing_frame_size) override {}
+ grpc_error* RecvData(int64_t incoming_frame_size) override {
+ return GRPC_ERROR_NONE;
+ }
+ uint32_t MaybeSendUpdate() override { return 0; }
+ void RecvUpdate(uint32_t size) override {}
+ void IncomingByteStreamUpdate(size_t max_size_hint,
+ size_t have_already) override {}
+};
+
+// Implementation of flow control that abides to HTTP/2 spec and attempts
+// to be as performant as possible.
+class StreamFlowControl final : public StreamFlowControlBase {
public:
StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s);
~StreamFlowControl() {
tfc_->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
}
- FlowControlAction UpdateAction(FlowControlAction action);
- FlowControlAction MakeAction() { return UpdateAction(tfc_->MakeAction()); }
+ FlowControlAction UpdateAction(FlowControlAction action) override;
+ FlowControlAction MakeAction() override {
+ return UpdateAction(tfc_->MakeAction());
+ }
// we have sent data on the wire, we must track this in our bookkeeping for
// the remote peer's flow control.
- void SentData(int64_t outgoing_frame_size) {
+ void SentData(int64_t outgoing_frame_size) override {
FlowControlTrace tracer(" data sent", tfc_, this);
tfc_->StreamSentData(outgoing_frame_size);
remote_window_delta_ -= outgoing_frame_size;
}
// we have received data from the wire
- grpc_error* RecvData(int64_t incoming_frame_size);
+ grpc_error* RecvData(int64_t incoming_frame_size) override;
// returns an announce if we should send a stream update to our peer, else
// returns zero
- uint32_t MaybeSendUpdate();
+ uint32_t MaybeSendUpdate() override;
// we have received a WINDOW_UPDATE frame for a stream
- void RecvUpdate(uint32_t size) {
+ void RecvUpdate(uint32_t size) override {
FlowControlTrace trace("s updt recv", tfc_, this);
remote_window_delta_ += size;
}
// the application is asking for a certain amount of bytes
- void IncomingByteStreamUpdate(size_t max_size_hint, size_t have_already);
+ void IncomingByteStreamUpdate(size_t max_size_hint,
+ size_t have_already) override;
int64_t remote_window_delta() const { return remote_window_delta_; }
int64_t local_window_delta() const { return local_window_delta_; }
@@ -297,14 +457,13 @@ class StreamFlowControl {
const grpc_chttp2_stream* stream() const { return s_; }
- void TestOnlyForceHugeWindow() {
+ void TestOnlyForceHugeWindow() override {
announced_window_delta_ = 1024 * 1024 * 1024;
local_window_delta_ = 1024 * 1024 * 1024;
remote_window_delta_ = 1024 * 1024 * 1024;
}
private:
- friend class ::grpc::testing::TrickledCHTTP2;
TransportFlowControl* const tfc_;
const grpc_chttp2_stream* const s_;
@@ -313,21 +472,6 @@ class StreamFlowControl {
announced_window_delta_ += change;
tfc->PostUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
}
-
- /** window available for us to send to peer, over or under the initial
- * window
- * size of the transport... ie:
- * remote_window = remote_window_delta + transport.initial_window_size */
- int64_t remote_window_delta_ = 0;
-
- /** window available for peer to send to us (as a delta on
- * transport.initial_window_size)
- * local_window = local_window_delta + transport.initial_window_size */
- int64_t local_window_delta_ = 0;
-
- /** window available for peer to send to us over this stream that we have
- * announced to the peer */
- int64_t announced_window_delta_ = 0;
};
} // namespace chttp2
diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.cc b/src/core/ext/transport/chttp2/transport/frame_settings.cc
index c6c2a6c301..0d245f4ba1 100644
--- a/src/core/ext/transport/chttp2/transport/frame_settings.cc
+++ b/src/core/ext/transport/chttp2/transport/frame_settings.cc
@@ -186,6 +186,12 @@ grpc_error* grpc_chttp2_settings_parser_parse(void* p, grpc_chttp2_transport* t,
if (grpc_wire_id_to_setting_id(parser->id, &id)) {
const grpc_chttp2_setting_parameters* sp =
&grpc_chttp2_settings_parameters[id];
+ // If flow control is disabled we skip these.
+ if (!t->flow_control->flow_control_enabled() &&
+ (id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ||
+ id == GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE)) {
+ continue;
+ }
if (parser->value < sp->min_value || parser->value > sp->max_value) {
switch (sp->invalid_value_behavior) {
case GRPC_CHTTP2_CLAMP_INVALID_VALUE:
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 932f5ba83d..8852d9ee2b 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -351,7 +351,10 @@ struct grpc_chttp2_transport {
/** parser for goaway frames */
grpc_chttp2_goaway_parser goaway_parser;
- grpc_core::ManualConstructor<grpc_core::chttp2::TransportFlowControl>
+ grpc_core::PolymorphicManualConstructor<
+ grpc_core::chttp2::TransportFlowControlBase,
+ grpc_core::chttp2::TransportFlowControl,
+ grpc_core::chttp2::TransportFlowControlDisabled>
flow_control;
/** initial window change. This is tracked as we parse settings frames from
* the remote peer. If there is a positive delta, then we will make all
@@ -525,7 +528,10 @@ struct grpc_chttp2_stream {
bool sent_initial_metadata;
bool sent_trailing_metadata;
- grpc_core::ManualConstructor<grpc_core::chttp2::StreamFlowControl>
+ grpc_core::PolymorphicManualConstructor<
+ grpc_core::chttp2::StreamFlowControlBase,
+ grpc_core::chttp2::StreamFlowControl,
+ grpc_core::chttp2::StreamFlowControlDisabled>
flow_control;
grpc_slice_buffer flow_controlled_buffer;
diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc
index a56f89cc75..59dc38ef98 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.cc
+++ b/src/core/ext/transport/chttp2/transport/parsing.cc
@@ -186,9 +186,10 @@ grpc_error* grpc_chttp2_perform_read(grpc_chttp2_transport* t,
return GRPC_ERROR_NONE;
}
goto dts_fh_0; /* loop */
- } else if (t->incoming_frame_size >
- t->settings[GRPC_ACKED_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]) {
+ } else if (t->flow_control->flow_control_enabled() &&
+ t->incoming_frame_size >
+ t->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]) {
char* msg;
gpr_asprintf(&msg, "Frame size %d is larger than max frame size %d",
t->incoming_frame_size,
diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.cc b/src/core/ext/transport/chttp2/transport/stream_lists.cc
index c95d02541a..3aad8c5823 100644
--- a/src/core/ext/transport/chttp2/transport/stream_lists.cc
+++ b/src/core/ext/transport/chttp2/transport/stream_lists.cc
@@ -183,6 +183,7 @@ void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport* t,
void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport* t,
grpc_chttp2_stream* s) {
+ GPR_ASSERT(t->flow_control->flow_control_enabled());
stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
@@ -198,6 +199,7 @@ void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport* t,
void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport* t,
grpc_chttp2_stream* s) {
+ GPR_ASSERT(t->flow_control->flow_control_enabled());
stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
}