diff options
Diffstat (limited to 'src/core/ext/transport')
7 files changed, 264 insertions, 57 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 7c77de2168..5ee1f08e61 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -232,6 +232,9 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) { gpr_ref(&t->refs); } static const grpc_transport_vtable* get_vtable(void); +// -1 == unset, 0 == disabled, 1 == enabled +static int flow_control_enabled = -1; + static void init_transport(grpc_chttp2_transport* t, const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client) { @@ -517,7 +520,23 @@ static void init_transport(grpc_chttp2_transport* t, } } - t->flow_control.Init(t, enable_bdp); + if (flow_control_enabled == -1) { + char* env_variable = gpr_getenv("GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL"); + if (env_variable != nullptr) { + flow_control_enabled = 0; + } else { + flow_control_enabled = 1; + } + gpr_free(env_variable); + } + + if (flow_control_enabled) { + t->flow_control.Init<grpc_core::chttp2::TransportFlowControl>(t, + enable_bdp); + } else { + t->flow_control.Init<grpc_core::chttp2::TransportFlowControlDisabled>(t); + enable_bdp = false; + } /* No pings allowed before receiving a header or data frame. */ t->ping_state.pings_before_data_required = 0; @@ -682,7 +701,14 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs, post_destructive_reclaimer(t); } - s->flow_control.Init(t->flow_control.get(), s); + if (t->flow_control->flow_control_enabled()) { + s->flow_control.Init<grpc_core::chttp2::StreamFlowControl>( + static_cast<grpc_core::chttp2::TransportFlowControl*>( + t->flow_control.get()), + s); + } else { + s->flow_control.Init<grpc_core::chttp2::StreamFlowControlDisabled>(); + } GPR_TIMER_END("init_stream", 0); return 0; @@ -2402,8 +2428,11 @@ static void read_action_locked(void* tp, grpc_error* error) { grpc_error* errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, GRPC_ERROR_NONE}; for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) { - t->flow_control->bdp_estimator()->AddIncomingBytes( - (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i])); + grpc_core::BdpEstimator* bdp_est = t->flow_control->bdp_estimator(); + if (bdp_est) { + bdp_est->AddIncomingBytes( + (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i])); + } errors[1] = grpc_chttp2_perform_read(t, t->read_buffer.slices[i]); } if (errors[1] != GRPC_ERROR_NONE) { diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc index ca48cc7e0a..b4864077bd 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.cc +++ b/src/core/ext/transport/chttp2/transport/flow_control.cc @@ -149,6 +149,25 @@ void FlowControlAction::Trace(grpc_chttp2_transport* t) const { gpr_free(mf_str); } +TransportFlowControlDisabled::TransportFlowControlDisabled( + grpc_chttp2_transport* t) { + remote_window_ = kMaxWindow; + target_initial_window_size_ = kMaxWindow; + announced_window_ = kMaxWindow; + t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] = + kFrameSize; + t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] = + kFrameSize; + t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] = + kFrameSize; + t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] = + kMaxWindow; + t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] = + kMaxWindow; + t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] = + kMaxWindow; +} + TransportFlowControl::TransportFlowControl(const grpc_chttp2_transport* t, bool enable_bdp_probe) : t_(t), diff --git a/src/core/ext/transport/chttp2/transport/flow_control.h b/src/core/ext/transport/chttp2/transport/flow_control.h index 8306047dbc..789f755c38 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.h +++ b/src/core/ext/transport/chttp2/transport/flow_control.h @@ -24,6 +24,7 @@ #include <grpc/support/useful.h> #include "src/core/ext/transport/chttp2/transport/http2_settings.h" +#include "src/core/lib/support/abstract.h" #include "src/core/lib/support/manual_constructor.h" #include "src/core/lib/transport/bdp_estimator.h" #include "src/core/lib/transport/pid_controller.h" @@ -43,10 +44,16 @@ namespace grpc_core { namespace chttp2 { static constexpr uint32_t kDefaultWindow = 65535; +static constexpr int64_t kMaxWindow = (int64_t)((1u << 31) - 1); +// TODO(ncteisen): Tune this +static constexpr uint32_t kFrameSize = 1024 * 1024; class TransportFlowControl; class StreamFlowControl; +// Encapsulates a collections of actions the transport needs to take with +// regard to flow control. Each action comes with urgencies that tell the +// transport how quickly the action must take place. class FlowControlAction { public: enum class Urgency : uint8_t { @@ -132,36 +139,122 @@ class FlowControlTrace { int64_t announced_window_delta_; }; -class TransportFlowControl { +// Fat interface with all methods a flow control implementation needs to +// support. gRPC C Core does not support pure virtual functions, so instead +// we abort in any methods which require implementation in the base class. +class TransportFlowControlBase { + public: + TransportFlowControlBase() {} + virtual ~TransportFlowControlBase() {} + + // Is flow control enabled? This is needed in other codepaths like the checks + // in parsing and in writing. + virtual bool flow_control_enabled() const { abort(); } + + // Called to check if the transport needs to send a WINDOW_UPDATE frame + virtual uint32_t MaybeSendUpdate(bool writing_anyway) { abort(); } + + // Using the protected members, returns and Action to be taken by the + // tranport. + virtual FlowControlAction MakeAction() { abort(); } + + // Using the protected members, returns and Action to be taken by the + // tranport. Also checks for updates to our BDP estimate and acts + // accordingly. + virtual FlowControlAction PeriodicUpdate() { abort(); } + + // Called to do bookkeeping when a stream owned by this transport sends + // data on the wire + virtual void StreamSentData(int64_t size) { abort(); } + + // Called to do bookkeeping when a stream owned by this transport receives + // data from the wire. Also does error checking for frame size. + virtual grpc_error* RecvData(int64_t incoming_frame_size) { abort(); } + + // Called to do bookkeeping when we receive a WINDOW_UPDATE frame. + virtual void RecvUpdate(uint32_t size) { abort(); } + + // Returns the BdpEstimator held by this object. Caller is responsible for + // checking for nullptr. TODO(ncteisen): consider fully encapsulating all + // bdp estimator actions inside TransportFlowControl + virtual BdpEstimator* bdp_estimator() { return nullptr; } + + // Getters + int64_t remote_window() const { return remote_window_; } + virtual int64_t target_window() const { return target_initial_window_size_; } + int64_t announced_window() const { return announced_window_; } + + // Used in certain benchmarks in which we don't want FlowControl to be a + // factor + virtual void TestOnlyForceHugeWindow() {} + + GRPC_ABSTRACT_BASE_CLASS + + protected: + friend class ::grpc::testing::TrickledCHTTP2; + int64_t remote_window_ = kDefaultWindow; + int64_t target_initial_window_size_ = kDefaultWindow; + int64_t announced_window_ = kDefaultWindow; +}; + +// Implementation of flow control that does NOTHING. Always returns maximum +// values, never initiates writes, and assumes that the remote peer is doing +// the same. To be used to narrow down on flow control as the cause of negative +// performance. +class TransportFlowControlDisabled final : public TransportFlowControlBase { + public: + // Maxes out all values + TransportFlowControlDisabled(grpc_chttp2_transport* t); + + bool flow_control_enabled() const override { return false; } + + // Never do anything. + uint32_t MaybeSendUpdate(bool writing_anyway) override { return 0; } + FlowControlAction MakeAction() override { return FlowControlAction(); } + FlowControlAction PeriodicUpdate() override { return FlowControlAction(); } + void StreamSentData(int64_t size) override {} + grpc_error* RecvData(int64_t incoming_frame_size) override { + return GRPC_ERROR_NONE; + } + void RecvUpdate(uint32_t size) override {} +}; + +// Implementation of flow control that abides to HTTP/2 spec and attempts +// to be as performant as possible. +class TransportFlowControl final : public TransportFlowControlBase { public: TransportFlowControl(const grpc_chttp2_transport* t, bool enable_bdp_probe); ~TransportFlowControl() {} + bool flow_control_enabled() const override { return true; } + bool bdp_probe() const { return enable_bdp_probe_; } // returns an announce if we should send a transport update to our peer, // else returns zero; writing_anyway indicates if a write would happen // regardless of the send - if it is false and this function returns non-zero, // this announce will cause a write to occur - uint32_t MaybeSendUpdate(bool writing_anyway); + uint32_t MaybeSendUpdate(bool writing_anyway) override; // Reads the flow control data and returns and actionable struct that will // tell chttp2 exactly what it needs to do - FlowControlAction MakeAction() { return UpdateAction(FlowControlAction()); } + FlowControlAction MakeAction() override { + return UpdateAction(FlowControlAction()); + } // Call periodically (at a low-ish rate, 100ms - 10s makes sense) // to perform more complex flow control calculations and return an action // to let chttp2 change its parameters - FlowControlAction PeriodicUpdate(); + FlowControlAction PeriodicUpdate() override; - void StreamSentData(int64_t size) { remote_window_ -= size; } + void StreamSentData(int64_t size) override { remote_window_ -= size; } grpc_error* ValidateRecvData(int64_t incoming_frame_size); void CommitRecvData(int64_t incoming_frame_size) { announced_window_ -= incoming_frame_size; } - grpc_error* RecvData(int64_t incoming_frame_size) { + grpc_error* RecvData(int64_t incoming_frame_size) override { FlowControlTrace trace(" data recv", this, nullptr); grpc_error* error = ValidateRecvData(incoming_frame_size); if (error != GRPC_ERROR_NONE) return error; @@ -170,18 +263,18 @@ class TransportFlowControl { } // we have received a WINDOW_UPDATE frame for a transport - void RecvUpdate(uint32_t size) { + void RecvUpdate(uint32_t size) override { FlowControlTrace trace("t updt recv", this, nullptr); remote_window_ += size; } - int64_t remote_window() const { return remote_window_; } - int64_t target_window() const { + // See comment above announced_stream_total_over_incoming_window_ for the + // logic behind this decision. + int64_t target_window() const override { return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1), announced_stream_total_over_incoming_window_ + target_initial_window_size_); } - int64_t announced_window() const { return announced_window_; } const grpc_chttp2_transport* transport() const { return t_; } @@ -201,15 +294,14 @@ class TransportFlowControl { } } - BdpEstimator* bdp_estimator() { return &bdp_estimator_; } + BdpEstimator* bdp_estimator() override { return &bdp_estimator_; } - void TestOnlyForceHugeWindow() { + void TestOnlyForceHugeWindow() override { announced_window_ = 1024 * 1024 * 1024; remote_window_ = 1024 * 1024 * 1024; } private: - friend class ::grpc::testing::TrickledCHTTP2; double TargetLogBdp(); double SmoothLogBdp(double value); FlowControlAction::Urgency DeltaUrgency(int32_t value, @@ -225,9 +317,6 @@ class TransportFlowControl { const grpc_chttp2_transport* const t_; - /** Our bookkeeping for the remote peer's available window */ - int64_t remote_window_ = kDefaultWindow; - /** calculating what we should give for local window: we track the total amount of flow control over initial window size across all streams: this is data that we want to receive right now (it @@ -239,13 +328,6 @@ class TransportFlowControl { int64_t announced_stream_total_over_incoming_window_ = 0; int64_t announced_stream_total_under_incoming_window_ = 0; - /** This is out window according to what we have sent to our remote peer. The - * difference between this and target window is what we use to decide when - * to send WINDOW_UPDATE frames. */ - int64_t announced_window_ = kDefaultWindow; - - int32_t target_initial_window_size_ = kDefaultWindow; - /** should we probe bdp? */ const bool enable_bdp_probe_; @@ -257,39 +339,117 @@ class TransportFlowControl { grpc_millis last_pid_update_ = 0; }; -class StreamFlowControl { +// Fat interface with all methods a stream flow control implementation needs +// to support. gRPC C Core does not support pure virtual functions, so instead +// we abort in any methods which require implementation in the base class. +class StreamFlowControlBase { + public: + StreamFlowControlBase() {} + virtual ~StreamFlowControlBase() {} + + // Updates an action using the protected members. + virtual FlowControlAction UpdateAction(FlowControlAction action) { abort(); } + + // Using the protected members, returns an Action for this stream to be + // taken by the tranport. + virtual FlowControlAction MakeAction() { abort(); } + + // Bookkeeping for when data is sent on this stream. + virtual void SentData(int64_t outgoing_frame_size) { abort(); } + + // Bookkeeping and error checking for when data is received by this stream. + virtual grpc_error* RecvData(int64_t incoming_frame_size) { abort(); } + + // Called to check if this stream needs to send a WINDOW_UPDATE frame. + virtual uint32_t MaybeSendUpdate() { abort(); } + + // Bookkeeping for receiving a WINDOW_UPDATE from for this stream. + virtual void RecvUpdate(uint32_t size) { abort(); } + + // Bookkeeping for when a call pulls bytes out of the transport. At this + // point we consider the data 'used' and can thus let out peer know we are + // ready for more data. + virtual void IncomingByteStreamUpdate(size_t max_size_hint, + size_t have_already) { + abort(); + } + + // Used in certain benchmarks in which we don't want FlowControl to be a + // factor + virtual void TestOnlyForceHugeWindow() {} + + // Getters + int64_t remote_window_delta() { return remote_window_delta_; } + int64_t local_window_delta() { return local_window_delta_; } + int64_t announced_window_delta() { return announced_window_delta_; } + + GRPC_ABSTRACT_BASE_CLASS + + protected: + friend class ::grpc::testing::TrickledCHTTP2; + int64_t remote_window_delta_ = 0; + int64_t local_window_delta_ = 0; + int64_t announced_window_delta_ = 0; +}; + +// Implementation of flow control that does NOTHING. Always returns maximum +// values, never initiates writes, and assumes that the remote peer is doing +// the same. To be used to narrow down on flow control as the cause of negative +// performance. +class StreamFlowControlDisabled : public StreamFlowControlBase { + public: + FlowControlAction UpdateAction(FlowControlAction action) override { + return action; + } + FlowControlAction MakeAction() override { return FlowControlAction(); } + void SentData(int64_t outgoing_frame_size) override {} + grpc_error* RecvData(int64_t incoming_frame_size) override { + return GRPC_ERROR_NONE; + } + uint32_t MaybeSendUpdate() override { return 0; } + void RecvUpdate(uint32_t size) override {} + void IncomingByteStreamUpdate(size_t max_size_hint, + size_t have_already) override {} +}; + +// Implementation of flow control that abides to HTTP/2 spec and attempts +// to be as performant as possible. +class StreamFlowControl final : public StreamFlowControlBase { public: StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s); ~StreamFlowControl() { tfc_->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_); } - FlowControlAction UpdateAction(FlowControlAction action); - FlowControlAction MakeAction() { return UpdateAction(tfc_->MakeAction()); } + FlowControlAction UpdateAction(FlowControlAction action) override; + FlowControlAction MakeAction() override { + return UpdateAction(tfc_->MakeAction()); + } // we have sent data on the wire, we must track this in our bookkeeping for // the remote peer's flow control. - void SentData(int64_t outgoing_frame_size) { + void SentData(int64_t outgoing_frame_size) override { FlowControlTrace tracer(" data sent", tfc_, this); tfc_->StreamSentData(outgoing_frame_size); remote_window_delta_ -= outgoing_frame_size; } // we have received data from the wire - grpc_error* RecvData(int64_t incoming_frame_size); + grpc_error* RecvData(int64_t incoming_frame_size) override; // returns an announce if we should send a stream update to our peer, else // returns zero - uint32_t MaybeSendUpdate(); + uint32_t MaybeSendUpdate() override; // we have received a WINDOW_UPDATE frame for a stream - void RecvUpdate(uint32_t size) { + void RecvUpdate(uint32_t size) override { FlowControlTrace trace("s updt recv", tfc_, this); remote_window_delta_ += size; } // the application is asking for a certain amount of bytes - void IncomingByteStreamUpdate(size_t max_size_hint, size_t have_already); + void IncomingByteStreamUpdate(size_t max_size_hint, + size_t have_already) override; int64_t remote_window_delta() const { return remote_window_delta_; } int64_t local_window_delta() const { return local_window_delta_; } @@ -297,14 +457,13 @@ class StreamFlowControl { const grpc_chttp2_stream* stream() const { return s_; } - void TestOnlyForceHugeWindow() { + void TestOnlyForceHugeWindow() override { announced_window_delta_ = 1024 * 1024 * 1024; local_window_delta_ = 1024 * 1024 * 1024; remote_window_delta_ = 1024 * 1024 * 1024; } private: - friend class ::grpc::testing::TrickledCHTTP2; TransportFlowControl* const tfc_; const grpc_chttp2_stream* const s_; @@ -313,21 +472,6 @@ class StreamFlowControl { announced_window_delta_ += change; tfc->PostUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_); } - - /** window available for us to send to peer, over or under the initial - * window - * size of the transport... ie: - * remote_window = remote_window_delta + transport.initial_window_size */ - int64_t remote_window_delta_ = 0; - - /** window available for peer to send to us (as a delta on - * transport.initial_window_size) - * local_window = local_window_delta + transport.initial_window_size */ - int64_t local_window_delta_ = 0; - - /** window available for peer to send to us over this stream that we have - * announced to the peer */ - int64_t announced_window_delta_ = 0; }; } // namespace chttp2 diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.cc b/src/core/ext/transport/chttp2/transport/frame_settings.cc index c6c2a6c301..0d245f4ba1 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.cc +++ b/src/core/ext/transport/chttp2/transport/frame_settings.cc @@ -186,6 +186,12 @@ grpc_error* grpc_chttp2_settings_parser_parse(void* p, grpc_chttp2_transport* t, if (grpc_wire_id_to_setting_id(parser->id, &id)) { const grpc_chttp2_setting_parameters* sp = &grpc_chttp2_settings_parameters[id]; + // If flow control is disabled we skip these. + if (!t->flow_control->flow_control_enabled() && + (id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE || + id == GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE)) { + continue; + } if (parser->value < sp->min_value || parser->value > sp->max_value) { switch (sp->invalid_value_behavior) { case GRPC_CHTTP2_CLAMP_INVALID_VALUE: diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 932f5ba83d..8852d9ee2b 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -351,7 +351,10 @@ struct grpc_chttp2_transport { /** parser for goaway frames */ grpc_chttp2_goaway_parser goaway_parser; - grpc_core::ManualConstructor<grpc_core::chttp2::TransportFlowControl> + grpc_core::PolymorphicManualConstructor< + grpc_core::chttp2::TransportFlowControlBase, + grpc_core::chttp2::TransportFlowControl, + grpc_core::chttp2::TransportFlowControlDisabled> flow_control; /** initial window change. This is tracked as we parse settings frames from * the remote peer. If there is a positive delta, then we will make all @@ -525,7 +528,10 @@ struct grpc_chttp2_stream { bool sent_initial_metadata; bool sent_trailing_metadata; - grpc_core::ManualConstructor<grpc_core::chttp2::StreamFlowControl> + grpc_core::PolymorphicManualConstructor< + grpc_core::chttp2::StreamFlowControlBase, + grpc_core::chttp2::StreamFlowControl, + grpc_core::chttp2::StreamFlowControlDisabled> flow_control; grpc_slice_buffer flow_controlled_buffer; diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index a56f89cc75..59dc38ef98 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -186,9 +186,10 @@ grpc_error* grpc_chttp2_perform_read(grpc_chttp2_transport* t, return GRPC_ERROR_NONE; } goto dts_fh_0; /* loop */ - } else if (t->incoming_frame_size > - t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]) { + } else if (t->flow_control->flow_control_enabled() && + t->incoming_frame_size > + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]) { char* msg; gpr_asprintf(&msg, "Frame size %d is larger than max frame size %d", t->incoming_frame_size, diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.cc b/src/core/ext/transport/chttp2/transport/stream_lists.cc index c95d02541a..3aad8c5823 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.cc +++ b/src/core/ext/transport/chttp2/transport/stream_lists.cc @@ -183,6 +183,7 @@ void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport* t, void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport* t, grpc_chttp2_stream* s) { + GPR_ASSERT(t->flow_control->flow_control_enabled()); stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); } @@ -198,6 +199,7 @@ void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport* t, void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s) { + GPR_ASSERT(t->flow_control->flow_control_enabled()); stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM); } |