summarylogtreecommitdiffstats
path: root/0004-limit-http-requests-per-io-cycle.patch
diff options
context:
space:
mode:
Diffstat (limited to '0004-limit-http-requests-per-io-cycle.patch')
-rw-r--r--0004-limit-http-requests-per-io-cycle.patch821
1 files changed, 821 insertions, 0 deletions
diff --git a/0004-limit-http-requests-per-io-cycle.patch b/0004-limit-http-requests-per-io-cycle.patch
new file mode 100644
index 000000000000..702e799513ec
--- /dev/null
+++ b/0004-limit-http-requests-per-io-cycle.patch
@@ -0,0 +1,821 @@
+From 2e4228b0ee73ae640c92e0974c91e251997a3d2f Mon Sep 17 00:00:00 2001
+From: Yan Avlasov <yavlasov@google.com>
+Date: Sat, 30 Sep 2023 13:58:46 +0000
+Subject: [PATCH] Limit on the number of HTTP requests processed from a
+ connection in an I/O cycle
+
+Signed-off-by: Yan Avlasov <yavlasov@google.com>
+---
+ changelogs/current.yaml | 7 +
+ source/common/http/conn_manager_impl.cc | 89 ++++++-
+ source/common/http/conn_manager_impl.h | 25 +-
+ test/common/http/conn_manager_impl_test_2.cc | 245 ++++++++++++++++++
+ .../http/conn_manager_impl_test_base.cc | 19 ++
+ .../common/http/conn_manager_impl_test_base.h | 2 +
+ test/common/http/http2/http2_frame.cc | 12 +-
+ test/common/http/http2/http2_frame.h | 14 +-
+ .../multiplexed_integration_test.cc | 170 ++++++++++++
+ 9 files changed, 569 insertions(+), 14 deletions(-)
+
+diff --git a/changelogs/current.yaml b/changelogs/current.yaml
+index b5d45089f337..86d0eac2339f 100644
+--- a/changelogs/current.yaml
++++ b/changelogs/current.yaml
+@@ -10,6 +10,13 @@
+ resets is applied. The connection is disconnected if more than 50% of resets are premature.
+ Setting the runtime key ``envoy.restart_features.send_goaway_for_premature_rst_streams`` to ``false`` completely disables
+ this check.
++- area: http
++ change: |
++ Add runtime flag ``http.max_requests_per_io_cycle`` for setting the limit on the number of HTTP requests processed
++ from a single connection in a single I/O cycle. Requests over this limit are processed in subsequent I/O cycles. This
++ mitigates CPU starvation by connections that simultaneously send high number of requests by allowing requests from other
++ connections to make progress. This runtime value can be set to 1 in the presence of abusive HTTP/2 or HTTP/3 connections.
++ By default this limit is disabled.
+ - area: build
+ change: |
+ Moved the subset, ring_hash, and maglev LB code into extensions. If you use these load balancers and override
+diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc
+index 021c55bc4cca..bf0439708413 100644
+--- a/source/common/http/conn_manager_impl.cc
++++ b/source/common/http/conn_manager_impl.cc
+@@ -60,6 +60,10 @@ const absl::string_view ConnectionManagerImpl::PrematureResetTotalStreamCountKey
+ "overload.premature_reset_total_stream_count";
+ const absl::string_view ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey =
+ "overload.premature_reset_min_stream_lifetime_seconds";
++// Runtime key for maximum number of requests that can be processed from a single connection per
++// I/O cycle. Requests over this limit are deferred until the next I/O cycle.
++const absl::string_view ConnectionManagerImpl::MaxRequestsPerIoCycle =
++ "http.max_requests_per_io_cycle";
+
+ bool requestWasConnect(const RequestHeaderMapSharedPtr& headers, Protocol protocol) {
+ if (!headers) {
+@@ -116,6 +120,8 @@ ConnectionManagerImpl::ConnectionManagerImpl(ConnectionManagerConfig& config,
+ /*node_id=*/local_info_.node().id(),
+ /*server_name=*/config_.serverName(),
+ /*proxy_status_config=*/config_.proxyStatusConfig())),
++ max_requests_during_dispatch_(
++ runtime_.snapshot().getInteger(ConnectionManagerImpl::MaxRequestsPerIoCycle, UINT32_MAX)),
+ refresh_rtt_after_request_(
+ Runtime::runtimeFeatureEnabled("envoy.reloadable_features.refresh_rtt_after_request")) {}
+
+@@ -128,6 +134,10 @@ const ResponseHeaderMap& ConnectionManagerImpl::continueHeader() {
+ void ConnectionManagerImpl::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) {
+ read_callbacks_ = &callbacks;
+ dispatcher_ = &callbacks.connection().dispatcher();
++ if (max_requests_during_dispatch_ != UINT32_MAX) {
++ deferred_request_processing_callback_ =
++ dispatcher_->createSchedulableCallback([this]() -> void { onDeferredRequestProcessing(); });
++ }
+
+ stats_.named_.downstream_cx_total_.inc();
+ stats_.named_.downstream_cx_active_.inc();
+@@ -454,6 +464,7 @@ void ConnectionManagerImpl::createCodec(Buffer::Instance& data) {
+ }
+
+ Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {
++ requests_during_dispatch_count_ = 0;
+ if (!codec_) {
+ // Http3 codec should have been instantiated by now.
+ createCodec(data);
+@@ -1366,7 +1377,12 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt
+ traceRequest();
+ }
+
+- filter_manager_.decodeHeaders(*request_headers_, end_stream);
++ if (!connection_manager_.shouldDeferRequestProxyingToNextIoCycle()) {
++ filter_manager_.decodeHeaders(*request_headers_, end_stream);
++ } else {
++ state_.deferred_to_next_io_iteration_ = true;
++ state_.deferred_end_stream_ = end_stream;
++ }
+
+ // Reset it here for both global and overridden cases.
+ resetIdleTimer();
+@@ -1433,8 +1449,15 @@ void ConnectionManagerImpl::ActiveStream::decodeData(Buffer::Instance& data, boo
+ connection_manager_.read_callbacks_->connection().dispatcher());
+ maybeEndDecode(end_stream);
+ filter_manager_.streamInfo().addBytesReceived(data.length());
+-
+- filter_manager_.decodeData(data, end_stream);
++ if (!state_.deferred_to_next_io_iteration_) {
++ filter_manager_.decodeData(data, end_stream);
++ } else {
++ if (!deferred_data_) {
++ deferred_data_ = std::make_unique<Buffer::OwnedImpl>();
++ }
++ deferred_data_->move(data);
++ state_.deferred_end_stream_ = end_stream;
++ }
+ }
+
+ void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&& trailers) {
+@@ -1450,7 +1473,9 @@ void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&&
+ return;
+ }
+ maybeEndDecode(true);
+- filter_manager_.decodeTrailers(*request_trailers_);
++ if (!state_.deferred_to_next_io_iteration_) {
++ filter_manager_.decodeTrailers(*request_trailers_);
++ }
+ }
+
+ void ConnectionManagerImpl::ActiveStream::decodeMetadata(MetadataMapPtr&& metadata_map) {
+@@ -2158,5 +2183,61 @@ void ConnectionManagerImpl::ActiveStream::resetStream(Http::StreamResetReason, a
+ connection_manager_.doEndStream(*this);
+ }
+
++bool ConnectionManagerImpl::ActiveStream::onDeferredRequestProcessing() {
++ // TODO(yanavlasov): Merge this with the filter manager continueIteration() method
++ if (!state_.deferred_to_next_io_iteration_) {
++ return false;
++ }
++ state_.deferred_to_next_io_iteration_ = false;
++ bool end_stream =
++ state_.deferred_end_stream_ && deferred_data_ == nullptr && request_trailers_ == nullptr;
++ filter_manager_.decodeHeaders(*request_headers_, end_stream);
++ if (end_stream) {
++ return true;
++ }
++ if (deferred_data_ != nullptr) {
++ end_stream = state_.deferred_end_stream_ && request_trailers_ == nullptr;
++ filter_manager_.decodeData(*deferred_data_, end_stream);
++ }
++ if (request_trailers_ != nullptr) {
++ filter_manager_.decodeTrailers(*request_trailers_);
++ }
++ return true;
++}
++
++bool ConnectionManagerImpl::shouldDeferRequestProxyingToNextIoCycle() {
++ // Do not defer this stream if stream deferral is disabled
++ if (deferred_request_processing_callback_ == nullptr) {
++ return false;
++ }
++ // Defer this stream if there are already deferred streams, so they are not
++ // processed out of order
++ if (deferred_request_processing_callback_->enabled()) {
++ return true;
++ }
++ ++requests_during_dispatch_count_;
++ bool defer = requests_during_dispatch_count_ > max_requests_during_dispatch_;
++ if (defer) {
++ deferred_request_processing_callback_->scheduleCallbackNextIteration();
++ }
++ return defer;
++}
++
++void ConnectionManagerImpl::onDeferredRequestProcessing() {
++ requests_during_dispatch_count_ = 1; // 1 stream is always let through
++ // Streams are inserted at the head of the list. As such process deferred
++ // streams at the back of the list first.
++ for (auto reverse_iter = streams_.rbegin(); reverse_iter != streams_.rend();) {
++ auto& stream_ptr = *reverse_iter;
++ // Move the iterator to the next item in case the `onDeferredRequestProcessing` call removes the
++ // stream from the list.
++ ++reverse_iter;
++ bool was_deferred = stream_ptr->onDeferredRequestProcessing();
++ if (was_deferred && shouldDeferRequestProxyingToNextIoCycle()) {
++ break;
++ }
++ }
++}
++
+ } // namespace Http
+ } // namespace Envoy
+diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h
+index dad494c953bc..e79a6a81c082 100644
+--- a/source/common/http/conn_manager_impl.h
++++ b/source/common/http/conn_manager_impl.h
+@@ -122,6 +122,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
+ // The minimum lifetime of a stream, in seconds, in order not to be considered
+ // prematurely closed.
+ static const absl::string_view PrematureResetMinStreamLifetimeSecondsKey;
++ static const absl::string_view MaxRequestsPerIoCycle;
+
+ private:
+ struct ActiveStream;
+@@ -345,7 +346,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
+ : codec_saw_local_complete_(false), codec_encode_complete_(false),
+ on_reset_stream_called_(false), is_zombie_stream_(false), saw_connection_close_(false),
+ successful_upgrade_(false), is_internally_destroyed_(false),
+- is_internally_created_(false), is_tunneling_(false), decorated_propagate_(true) {}
++ is_internally_created_(false), is_tunneling_(false), decorated_propagate_(true),
++ deferred_to_next_io_iteration_(false) {}
+
+ // It's possibly for the codec to see the completed response but not fully
+ // encode it.
+@@ -371,6 +373,14 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
+ bool is_tunneling_ : 1;
+
+ bool decorated_propagate_ : 1;
++
++ // Indicates that sending headers to the filter manager is deferred to the
++ // next I/O cycle. If data or trailers are received when this flag is set
++ // they are deferred too.
++ // TODO(yanavlasov): encapsulate the entire state of deferred streams into a separate
++ // structure, so it can be atomically created and cleared.
++ bool deferred_to_next_io_iteration_ : 1;
++ bool deferred_end_stream_ : 1;
+ };
+
+ bool canDestroyStream() const {
+@@ -418,6 +428,11 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
+ // HTTP connection manager configuration, then the entire connection is closed.
+ bool validateTrailers();
+
++ // Dispatch deferred headers, body and trailers to the filter manager.
++ // Return true if this stream was deferred and dispatched pending headers, body and trailers (if
++ // present). Return false if this stream was not deferred.
++ bool onDeferredRequestProcessing();
++
+ ConnectionManagerImpl& connection_manager_;
+ OptRef<const TracingConnectionManagerConfig> connection_manager_tracing_config_;
+ // TODO(snowp): It might make sense to move this to the FilterManager to avoid storing it in
+@@ -504,6 +519,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
+ const Tracing::CustomTagMap* customTags() const override;
+ bool verbose() const override;
+ uint32_t maxPathTagLength() const override;
++
++ std::unique_ptr<Buffer::OwnedImpl> deferred_data_;
+ };
+
+ using ActiveStreamPtr = std::unique_ptr<ActiveStream>;
+@@ -562,6 +579,9 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
+ // and at least half have been prematurely reset?
+ void maybeDrainDueToPrematureResets();
+
++ bool shouldDeferRequestProxyingToNextIoCycle();
++ void onDeferredRequestProcessing();
++
+ enum class DrainState { NotDraining, Draining, Closing };
+
+ ConnectionManagerConfig& config_;
+@@ -608,6 +628,9 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
+ // the definition given in `isPrematureRstStream()`.
+ uint64_t number_premature_stream_resets_{0};
+ const std::string proxy_name_; // for Proxy-Status.
++ uint32_t requests_during_dispatch_count_{0};
++ const uint32_t max_requests_during_dispatch_{UINT32_MAX};
++ Event::SchedulableCallbackPtr deferred_request_processing_callback_;
+
+ const bool refresh_rtt_after_request_{};
+ };
+diff --git a/test/common/http/conn_manager_impl_test_2.cc b/test/common/http/conn_manager_impl_test_2.cc
+index 5daf2b1a45e6..078bd1d85ab7 100644
+--- a/test/common/http/conn_manager_impl_test_2.cc
++++ b/test/common/http/conn_manager_impl_test_2.cc
+@@ -11,6 +11,7 @@ using testing::InvokeWithoutArgs;
+ using testing::Mock;
+ using testing::Ref;
+ using testing::Return;
++using testing::ReturnArg;
+ using testing::ReturnRef;
+
+ namespace Envoy {
+@@ -3767,5 +3768,249 @@ TEST_F(HttpConnectionManagerImplTest, NoProxyProtocolAdded) {
+ // Clean up.
+ filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose);
+ }
++
++// Validate that deferred streams are processed with a variety of
++// headers, data and trailer arriving in the same I/O cycle
++TEST_F(HttpConnectionManagerImplTest, LimitWorkPerIOCycle) {
++ const int kRequestsSentPerIOCycle = 100;
++ EXPECT_CALL(runtime_.snapshot_, getInteger(_, _)).WillRepeatedly(ReturnArg<1>());
++ // Process 1 request per I/O cycle
++ auto* deferred_request_callback = enableStreamsPerIoLimit(1);
++ setup(false, "");
++
++ // Store the basic request encoder during filter chain setup.
++ std::vector<std::shared_ptr<MockStreamDecoderFilter>> encoder_filters;
++ int decode_headers_call_count = 0;
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ std::shared_ptr<MockStreamDecoderFilter> filter(new NiceMock<MockStreamDecoderFilter>());
++
++ // Each 4th request is headers only
++ EXPECT_CALL(*filter, decodeHeaders(_, i % 4 == 0 ? true : false))
++ .WillRepeatedly(Invoke([&](RequestHeaderMap&, bool) -> FilterHeadersStatus {
++ ++decode_headers_call_count;
++ return FilterHeadersStatus::StopIteration;
++ }));
++
++ // Each 1st request is headers and data only
++ // Each 2nd request is headers, data and trailers
++ if (i % 4 == 1 || i % 4 == 2) {
++ EXPECT_CALL(*filter, decodeData(_, i % 4 == 1 ? true : false))
++ .WillOnce(Return(FilterDataStatus::StopIterationNoBuffer));
++ }
++
++ // Each 3rd request is headers and trailers (no data)
++ if (i % 4 == 2 || i % 4 == 3) {
++ EXPECT_CALL(*filter, decodeTrailers(_)).WillOnce(Return(FilterTrailersStatus::StopIteration));
++ }
++
++ EXPECT_CALL(*filter, setDecoderFilterCallbacks(_));
++ encoder_filters.push_back(std::move(filter));
++ }
++
++ uint64_t random_value = 0;
++ EXPECT_CALL(random_, random()).WillRepeatedly(Invoke([&random_value]() {
++ return random_value++;
++ }));
++
++ EXPECT_CALL(filter_factory_, createFilterChain(_))
++ .Times(kRequestsSentPerIOCycle)
++ .WillRepeatedly(Invoke([&encoder_filters](FilterChainManager& manager) -> bool {
++ static int index = 0;
++ int i = index++;
++ FilterFactoryCb factory([&encoder_filters, i](FilterChainFactoryCallbacks& callbacks) {
++ callbacks.addStreamDecoderFilter(encoder_filters[i]);
++ });
++ manager.applyFilterFactoryCb({}, factory);
++ return true;
++ }));
++
++ EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, deferredDelete_(_))
++ .Times(kRequestsSentPerIOCycle);
++
++ std::vector<NiceMock<MockResponseEncoder>> response_encoders(kRequestsSentPerIOCycle);
++ for (auto& encoder : response_encoders) {
++ EXPECT_CALL(encoder, getStream()).WillRepeatedly(ReturnRef(encoder.stream_));
++ }
++
++ EXPECT_CALL(*codec_, dispatch(_))
++ .WillRepeatedly(Invoke([&](Buffer::Instance& data) -> Http::Status {
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ decoder_ = &conn_manager_->newStream(response_encoders[i]);
++
++ RequestHeaderMapPtr headers{new TestRequestHeaderMapImpl{
++ {":authority", "host"}, {":path", "/"}, {":method", "GET"}}};
++
++ RequestTrailerMapPtr trailers{
++ new TestRequestTrailerMapImpl{{"key1", "value1"}, {"key2", "value2"}}};
++
++ Buffer::OwnedImpl data("data");
++
++ switch (i % 4) {
++ case 0:
++ decoder_->decodeHeaders(std::move(headers), true);
++ break;
++ case 1:
++ decoder_->decodeHeaders(std::move(headers), false);
++ decoder_->decodeData(data, true);
++ break;
++ case 2:
++ decoder_->decodeHeaders(std::move(headers), false);
++ decoder_->decodeData(data, false);
++ decoder_->decodeTrailers(std::move(trailers));
++ break;
++ case 3:
++ decoder_->decodeHeaders(std::move(headers), false);
++ decoder_->decodeTrailers(std::move(trailers));
++ break;
++ }
++ }
++
++ data.drain(4);
++ return Http::okStatus();
++ }));
++
++ // Kick off the incoming data.
++ Buffer::OwnedImpl fake_input("1234");
++ conn_manager_->onData(fake_input, false);
++
++ EXPECT_TRUE(deferred_request_callback->enabled_);
++ // Only one request should go through the filter chain
++ ASSERT_EQ(decode_headers_call_count, 1);
++
++ // Let other requests to go through the filter chain. Call expectations will fail
++ // if this is not the case.
++ int deferred_request_count = 0;
++ while (deferred_request_callback->enabled_) {
++ deferred_request_callback->invokeCallback();
++ ++deferred_request_count;
++ }
++
++ ASSERT_EQ(deferred_request_count, kRequestsSentPerIOCycle);
++
++ for (auto& filter : encoder_filters) {
++ ResponseHeaderMapPtr response_headers{new TestResponseHeaderMapImpl{{":status", "200"}}};
++ filter->callbacks_->streamInfo().setResponseCodeDetails("");
++ filter->callbacks_->encodeHeaders(std::move(response_headers), true, "details");
++ }
++
++ EXPECT_EQ(kRequestsSentPerIOCycle, stats_.named_.downstream_rq_2xx_.value());
++ EXPECT_EQ(kRequestsSentPerIOCycle, listener_stats_.downstream_rq_2xx_.value());
++ EXPECT_EQ(kRequestsSentPerIOCycle, stats_.named_.downstream_rq_completed_.value());
++ EXPECT_EQ(kRequestsSentPerIOCycle, listener_stats_.downstream_rq_completed_.value());
++}
++
++TEST_F(HttpConnectionManagerImplTest, StreamDeferralPreservesOrder) {
++ EXPECT_CALL(runtime_.snapshot_, getInteger(_, _)).WillRepeatedly(ReturnArg<1>());
++ // Process 1 request per I/O cycle
++ auto* deferred_request_callback = enableStreamsPerIoLimit(1);
++ setup(false, "");
++
++ std::vector<std::shared_ptr<MockStreamDecoderFilter>> encoder_filters;
++ int expected_request_id = 0;
++ const Http::LowerCaseString request_id_header(absl::string_view("request-id"));
++ // Two requests are processed in 2 I/O reads
++ const int TotalRequests = 2 * 2;
++ for (int i = 0; i < TotalRequests; ++i) {
++ std::shared_ptr<MockStreamDecoderFilter> filter(new NiceMock<MockStreamDecoderFilter>());
++
++ EXPECT_CALL(*filter, decodeHeaders(_, true))
++ .WillRepeatedly(Invoke([&](RequestHeaderMap& headers, bool) -> FilterHeadersStatus {
++ // Check that requests are decoded in expected order
++ int request_id = 0;
++ ASSERT(absl::SimpleAtoi(headers.get(request_id_header)[0]->value().getStringView(),
++ &request_id));
++ ASSERT(request_id == expected_request_id);
++ ++expected_request_id;
++ return FilterHeadersStatus::StopIteration;
++ }));
++
++ EXPECT_CALL(*filter, setDecoderFilterCallbacks(_));
++ encoder_filters.push_back(std::move(filter));
++ }
++
++ uint64_t random_value = 0;
++ EXPECT_CALL(random_, random()).WillRepeatedly(Invoke([&random_value]() {
++ return random_value++;
++ }));
++
++ EXPECT_CALL(filter_factory_, createFilterChain(_))
++ .Times(TotalRequests)
++ .WillRepeatedly(Invoke([&encoder_filters](FilterChainManager& manager) -> bool {
++ static int index = 0;
++ int i = index++;
++ FilterFactoryCb factory([&encoder_filters, i](FilterChainFactoryCallbacks& callbacks) {
++ callbacks.addStreamDecoderFilter(encoder_filters[i]);
++ });
++ manager.applyFilterFactoryCb({}, factory);
++ return true;
++ }));
++
++ EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, deferredDelete_(_)).Times(TotalRequests);
++
++ std::vector<NiceMock<MockResponseEncoder>> response_encoders(TotalRequests);
++ for (auto& encoder : response_encoders) {
++ EXPECT_CALL(encoder, getStream()).WillRepeatedly(ReturnRef(encoder.stream_));
++ }
++ auto response_encoders_iter = response_encoders.begin();
++
++ int request_id = 0;
++ EXPECT_CALL(*codec_, dispatch(_))
++ .WillRepeatedly(Invoke([&](Buffer::Instance& data) -> Http::Status {
++ // The second request should be deferred
++ for (int i = 0; i < 2; ++i) {
++ decoder_ = &conn_manager_->newStream(*response_encoders_iter);
++ ++response_encoders_iter;
++
++ RequestHeaderMapPtr headers{
++ new TestRequestHeaderMapImpl{{":authority", "host"},
++ {":path", "/"},
++ {":method", "GET"},
++ {"request-id", absl::StrCat(request_id)}}};
++
++ ++request_id;
++ decoder_->decodeHeaders(std::move(headers), true);
++ }
++
++ data.drain(4);
++ return Http::okStatus();
++ }));
++
++ // Kick off the incoming data.
++ Buffer::OwnedImpl fake_input("1234");
++ conn_manager_->onData(fake_input, false);
++
++ EXPECT_TRUE(deferred_request_callback->enabled_);
++ // Only one request should go through the filter chain
++ ASSERT_EQ(expected_request_id, 1);
++
++ // Test arrival of another request. New request is read from the socket before deferred callbacks.
++ Buffer::OwnedImpl fake_input2("1234");
++ conn_manager_->onData(fake_input2, false);
++
++ // No requests from the second read should go through as there are deferred stream present
++ ASSERT_EQ(expected_request_id, 1);
++
++ // Let other requests to go through the filter chain. Call expectations will fail
++ // if this is not the case.
++ int deferred_request_count = 0;
++ while (deferred_request_callback->enabled_) {
++ deferred_request_callback->invokeCallback();
++ ++deferred_request_count;
++ }
++
++ ASSERT_EQ(deferred_request_count, TotalRequests);
++
++ for (auto& filter : encoder_filters) {
++ ResponseHeaderMapPtr response_headers{new TestResponseHeaderMapImpl{{":status", "200"}}};
++ filter->callbacks_->streamInfo().setResponseCodeDetails("");
++ filter->callbacks_->encodeHeaders(std::move(response_headers), true, "details");
++ }
++
++ EXPECT_EQ(TotalRequests, stats_.named_.downstream_rq_2xx_.value());
++ EXPECT_EQ(TotalRequests, listener_stats_.downstream_rq_2xx_.value());
++ EXPECT_EQ(TotalRequests, stats_.named_.downstream_rq_completed_.value());
++ EXPECT_EQ(TotalRequests, listener_stats_.downstream_rq_completed_.value());
++}
++
+ } // namespace Http
+ } // namespace Envoy
+diff --git a/test/common/http/conn_manager_impl_test_base.cc b/test/common/http/conn_manager_impl_test_base.cc
+index 1c5053246e44..641ca6fae082 100644
+--- a/test/common/http/conn_manager_impl_test_base.cc
++++ b/test/common/http/conn_manager_impl_test_base.cc
+@@ -78,6 +78,7 @@ void HttpConnectionManagerImplMixin::setup(bool ssl, const std::string& server_n
+ conn_manager_ = std::make_unique<ConnectionManagerImpl>(
+ *this, drain_close_, random_, http_context_, runtime_, local_info_, cluster_manager_,
+ overload_manager_, test_time_.timeSystem());
++
+ conn_manager_->initializeReadFilterCallbacks(filter_callbacks_);
+
+ if (tracing) {
+@@ -370,5 +371,23 @@ void HttpConnectionManagerImplMixin::expectUhvTrailerCheck(
+ }));
+ }
+
++Event::MockSchedulableCallback*
++HttpConnectionManagerImplMixin::enableStreamsPerIoLimit(uint32_t limit) {
++ EXPECT_CALL(runtime_.snapshot_, getInteger("http.max_requests_per_io_cycle", _))
++ .WillOnce(Return(limit));
++
++ // Expect HCM to create and set schedulable callback
++ auto* deferred_request_callback =
++ new Event::MockSchedulableCallback(&filter_callbacks_.connection_.dispatcher_);
++ EXPECT_CALL(*deferred_request_callback, enabled())
++ .WillRepeatedly(
++ Invoke([deferred_request_callback]() { return deferred_request_callback->enabled_; }));
++ EXPECT_CALL(*deferred_request_callback, scheduleCallbackNextIteration())
++ .WillRepeatedly(
++ Invoke([deferred_request_callback]() { deferred_request_callback->enabled_ = true; }));
++
++ return deferred_request_callback;
++}
++
+ } // namespace Http
+ } // namespace Envoy
+diff --git a/test/common/http/conn_manager_impl_test_base.h b/test/common/http/conn_manager_impl_test_base.h
+index 99fdc9344118..10a4ff15b54a 100644
+--- a/test/common/http/conn_manager_impl_test_base.h
++++ b/test/common/http/conn_manager_impl_test_base.h
+@@ -202,6 +202,8 @@ class HttpConnectionManagerImplMixin : public ConnectionManagerConfig {
+ HeaderValidator::TransformationResult transformation_result,
+ bool expect_response = true);
+
++ Event::MockSchedulableCallback* enableStreamsPerIoLimit(uint32_t limit);
++
+ Envoy::Event::SimulatedTimeSystem test_time_;
+ NiceMock<Router::MockRouteConfigProvider> route_config_provider_;
+ std::shared_ptr<Router::MockConfig> route_config_{new NiceMock<Router::MockConfig>()};
+diff --git a/test/common/http/http2/http2_frame.cc b/test/common/http/http2/http2_frame.cc
+index c5172938a804..46ba3751ba24 100644
+--- a/test/common/http/http2/http2_frame.cc
++++ b/test/common/http/http2/http2_frame.cc
+@@ -341,7 +341,11 @@ Http2Frame Http2Frame::makeRequest(uint32_t stream_index, absl::string_view host
+ makeNetworkOrderStreamId(stream_index));
+ frame.appendStaticHeader(StaticHeaderIndex::MethodGet);
+ frame.appendStaticHeader(StaticHeaderIndex::SchemeHttps);
+- frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path);
++ if (path.empty() || path == "/") {
++ frame.appendStaticHeader(StaticHeaderIndex::Path);
++ } else {
++ frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path);
++ }
+ frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Authority, host);
+ frame.adjustPayloadSize();
+ return frame;
+@@ -365,7 +369,11 @@ Http2Frame Http2Frame::makePostRequest(uint32_t stream_index, absl::string_view
+ makeNetworkOrderStreamId(stream_index));
+ frame.appendStaticHeader(StaticHeaderIndex::MethodPost);
+ frame.appendStaticHeader(StaticHeaderIndex::SchemeHttps);
+- frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path);
++ if (path.empty() || path == "/") {
++ frame.appendStaticHeader(StaticHeaderIndex::Path);
++ } else {
++ frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path);
++ }
+ frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Authority, host);
+ frame.adjustPayloadSize();
+ return frame;
+diff --git a/test/common/http/http2/http2_frame.h b/test/common/http/http2/http2_frame.h
+index 7fdf510ea256..5df3375c1661 100644
+--- a/test/common/http/http2/http2_frame.h
++++ b/test/common/http/http2/http2_frame.h
+@@ -253,6 +253,13 @@ class Http2Frame {
+ ConstIterator end() const { return data_.end(); }
+ bool empty() const { return data_.empty(); }
+
++ void appendHeaderWithoutIndexing(const Header& header);
++ // This method updates payload length in the HTTP2 header based on the size of the data_
++ void adjustPayloadSize() {
++ ASSERT(size() >= HeaderSize);
++ setPayloadSize(size() - HeaderSize);
++ }
++
+ private:
+ void buildHeader(Type type, uint32_t payload_size = 0, uint8_t flags = 0, uint32_t stream_id = 0);
+ void setPayloadSize(uint32_t size);
+@@ -272,15 +279,8 @@ class Http2Frame {
+ // Headers are directly encoded
+ void appendStaticHeader(StaticHeaderIndex index);
+ void appendHeaderWithoutIndexing(StaticHeaderIndex index, absl::string_view value);
+- void appendHeaderWithoutIndexing(const Header& header);
+ void appendEmptyHeader();
+
+- // This method updates payload length in the HTTP2 header based on the size of the data_
+- void adjustPayloadSize() {
+- ASSERT(size() >= HeaderSize);
+- setPayloadSize(size() - HeaderSize);
+- }
+-
+ DataContainer data_;
+ };
+
+diff --git a/test/integration/multiplexed_integration_test.cc b/test/integration/multiplexed_integration_test.cc
+index b4260ff3fa9e..edce9a1838d9 100644
+--- a/test/integration/multiplexed_integration_test.cc
++++ b/test/integration/multiplexed_integration_test.cc
+@@ -2130,6 +2130,176 @@ TEST_P(Http2FrameIntegrationTest, AccessLogOfWireBytesIfResponseSizeGreaterThanW
+ tcp_client_->close();
+ }
+
++TEST_P(Http2FrameIntegrationTest, MultipleHeaderOnlyRequests) {
++ const int kRequestsSentPerIOCycle = 20;
++ autonomous_upstream_ = true;
++ config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1");
++ beginSession();
++
++ std::string buffer;
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(i), "a", "/",
++ {{"response_data_blocks", "0"}, {"no_trailers", "1"}});
++ absl::StrAppend(&buffer, std::string(request));
++ }
++
++ ASSERT_TRUE(tcp_client_->write(buffer, false, false));
++
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto frame = readFrame();
++ EXPECT_EQ(Http2Frame::Type::Headers, frame.type());
++ EXPECT_EQ(Http2Frame::ResponseStatus::Ok, frame.responseStatus());
++ }
++ tcp_client_->close();
++}
++
++TEST_P(Http2FrameIntegrationTest, MultipleRequests) {
++ const int kRequestsSentPerIOCycle = 20;
++ autonomous_upstream_ = true;
++ config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1");
++ beginSession();
++
++ std::string buffer;
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto request =
++ Http2Frame::makePostRequest(Http2Frame::makeClientStreamId(i), "a", "/",
++ {{"response_data_blocks", "0"}, {"no_trailers", "1"}});
++ absl::StrAppend(&buffer, std::string(request));
++ }
++
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto data = Http2Frame::makeDataFrame(Http2Frame::makeClientStreamId(i), "a",
++ Http2Frame::DataFlags::EndStream);
++ absl::StrAppend(&buffer, std::string(data));
++ }
++
++ ASSERT_TRUE(tcp_client_->write(buffer, false, false));
++
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto frame = readFrame();
++ EXPECT_EQ(Http2Frame::Type::Headers, frame.type());
++ EXPECT_EQ(Http2Frame::ResponseStatus::Ok, frame.responseStatus());
++ }
++ tcp_client_->close();
++}
++
++TEST_P(Http2FrameIntegrationTest, MultipleRequestsWithTrailers) {
++ const int kRequestsSentPerIOCycle = 20;
++ autonomous_upstream_ = true;
++ config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1");
++ beginSession();
++
++ std::string buffer;
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto request =
++ Http2Frame::makePostRequest(Http2Frame::makeClientStreamId(i), "a", "/",
++ {{"response_data_blocks", "0"}, {"no_trailers", "1"}});
++ absl::StrAppend(&buffer, std::string(request));
++ }
++
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto data = Http2Frame::makeDataFrame(Http2Frame::makeClientStreamId(i), "a");
++ absl::StrAppend(&buffer, std::string(data));
++ }
++
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto trailers = Http2Frame::makeEmptyHeadersFrame(
++ Http2Frame::makeClientStreamId(i),
++ static_cast<Http2Frame::HeadersFlags>(Http::Http2::orFlags(
++ Http2Frame::HeadersFlags::EndStream, Http2Frame::HeadersFlags::EndHeaders)));
++ trailers.appendHeaderWithoutIndexing({"k", "v"});
++ trailers.adjustPayloadSize();
++ absl::StrAppend(&buffer, std::string(trailers));
++ }
++
++ ASSERT_TRUE(tcp_client_->write(buffer, false, false));
++
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto frame = readFrame();
++ EXPECT_EQ(Http2Frame::Type::Headers, frame.type());
++ EXPECT_EQ(Http2Frame::ResponseStatus::Ok, frame.responseStatus());
++ }
++ tcp_client_->close();
++}
++
++TEST_P(Http2FrameIntegrationTest, MultipleHeaderOnlyRequestsFollowedByReset) {
++ // This number of requests stays below premature reset detection.
++ const int kRequestsSentPerIOCycle = 20;
++ config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1");
++ beginSession();
++
++ std::string buffer;
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(i), "a", "/",
++ {{"response_data_blocks", "0"}, {"no_trailers", "1"}});
++ absl::StrAppend(&buffer, std::string(request));
++ }
++
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto reset = Http2Frame::makeResetStreamFrame(Http2Frame::makeClientStreamId(i),
++ Http2Frame::ErrorCode::Cancel);
++ absl::StrAppend(&buffer, std::string(reset));
++ }
++
++ ASSERT_TRUE(tcp_client_->write(buffer, false, false));
++ test_server_->waitForCounterEq("http.config_test.downstream_rq_rx_reset",
++ kRequestsSentPerIOCycle);
++ // Client should remain connected
++ ASSERT_TRUE(tcp_client_->connected());
++ tcp_client_->close();
++}
++
++TEST_P(Http2FrameIntegrationTest, ResettingDeferredRequestsTriggersPrematureResetCheck) {
++ const int kRequestsSentPerIOCycle = 20;
++ // Set premature stream count to the number of streams we are about to send
++ config_helper_.addRuntimeOverride("overload.premature_reset_total_stream_count", "20");
++ config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1");
++ beginSession();
++
++ std::string buffer;
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(i), "a", "/",
++ {{"response_data_blocks", "0"}, {"no_trailers", "1"}});
++ absl::StrAppend(&buffer, std::string(request));
++ }
++
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto reset = Http2Frame::makeResetStreamFrame(Http2Frame::makeClientStreamId(i),
++ Http2Frame::ErrorCode::Cancel);
++ absl::StrAppend(&buffer, std::string(reset));
++ }
++
++ ASSERT_TRUE(tcp_client_->write(buffer, false, false));
++ // Envoy should close the client connection due to too many premature resets
++ tcp_client_->waitForDisconnect();
++ test_server_->waitForCounterEq("http.config_test.downstream_rq_too_many_premature_resets", 1);
++}
++
++TEST_P(Http2FrameIntegrationTest, CloseConnectionWithDeferredStreams) {
++ // Use large number of requests to ensure close is detected while there are
++ // still some deferred streams.
++ const int kRequestsSentPerIOCycle = 1000;
++ config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1");
++ // Ensure premature reset detection does not get in the way
++ config_helper_.addRuntimeOverride("overload.premature_reset_total_stream_count", "1001");
++ beginSession();
++
++ std::string buffer;
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(i), "a", "/",
++ {{"response_data_blocks", "0"}, {"no_trailers", "1"}});
++ absl::StrAppend(&buffer, std::string(request));
++ }
++
++ ASSERT_TRUE(tcp_client_->write(buffer, false, false));
++ ASSERT_TRUE(tcp_client_->connected());
++ // Drop the downstream connection
++ tcp_client_->close();
++ // Test that Envoy can clean-up deferred streams
++ test_server_->waitForCounterEq("http.config_test.downstream_rq_rx_reset",
++ kRequestsSentPerIOCycle);
++}
++
+ INSTANTIATE_TEST_SUITE_P(IpVersions, Http2FrameIntegrationTest,
+ testing::ValuesIn(Http2FrameIntegrationTest::testParams()),
+ frameIntegrationTestParamToString);
+--- envoy-1.27.0.a/changelogs/current.yaml 2023-10-10 11:36:11.553031312 -0400
++++ envoy-1.27.0.b/changelogs/current.yaml 2023-10-10 11:37:56.601045565 -0400