summarylogtreecommitdiffstats
diff options
context:
space:
mode:
authorDan Fuhry2023-10-10 11:42:03 -0400
committerDan Fuhry2023-10-10 11:42:03 -0400
commiteff67d2594574e06a001f61a98e23fe9e3cad678 (patch)
treed98732bd622a39d72d7081fa03e737426c077270
parentc1693f10e3ec96dfd5283228d7b8f4991abbf53e (diff)
downloadaur-eff67d2594574e06a001f61a98e23fe9e3cad678.tar.gz
upgpkg: envoyproxy 1.27.0-2
Add patches for CVE-2023-44487, "HTTP/2 Rapid Reset DDoS attack" See: https://blog.cloudflare.com/technical-breakdown-http2-rapid-reset-ddos-attack/
-rw-r--r--0003-close-http-connections-that-prematurely-reset-stream.patch312
-rw-r--r--0004-limit-http-requests-per-io-cycle.patch821
2 files changed, 1133 insertions, 0 deletions
diff --git a/0003-close-http-connections-that-prematurely-reset-stream.patch b/0003-close-http-connections-that-prematurely-reset-stream.patch
new file mode 100644
index 000000000000..936730eea7ec
--- /dev/null
+++ b/0003-close-http-connections-that-prematurely-reset-stream.patch
@@ -0,0 +1,312 @@
+From 481a9d54b8f5d73e16c6673eaea1a781c5e5705e Mon Sep 17 00:00:00 2001
+From: Yan Avlasov <yavlasov@google.com>
+Date: Thu, 28 Sep 2023 16:11:58 +0000
+Subject: [PATCH] Close HTTP connections that prematurely reset streams
+
+Signed-off-by: Yan Avlasov <yavlasov@google.com>
+---
+ changelogs/current.yaml | 9 +++
+ source/common/http/conn_manager_config.h | 1 +
+ source/common/http/conn_manager_impl.cc | 65 +++++++++++++++++
+ source/common/http/conn_manager_impl.h | 23 ++++++
+ source/common/runtime/runtime_features.cc | 1 +
+ .../multiplexed_integration_test.cc | 72 +++++++++++++++++++
+ 6 files changed, 171 insertions(+)
+
+diff --git a/changelogs/current.yaml b/changelogs/current.yaml
+index 32fc3bc047fd2..b5d45089f337c 100644
+--- a/changelogs/current.yaml
++++ b/changelogs/current.yaml
+@@ -1,6 +1,15 @@
+ date: July 26, 2023
+
+ behavior_changes:
++- area: http
++ change: |
++ Close HTTP/2 and HTTP/3 connections that prematurely reset streams. The runtime key
++ ``overload.premature_reset_min_stream_lifetime_seconds`` determines the interval where received stream
++ reset is considered premature (with 1 second default). The runtime key ``overload.premature_reset_total_stream_count``,
++ with the default value of 500, determines the number of requests received from a connection before the check for premature
++ resets is applied. The connection is disconnected if more than 50% of resets are premature.
++ Setting the runtime key ``envoy.restart_features.send_goaway_for_premature_rst_streams`` to ``false`` completely disables
++ this check.
+ - area: build
+ change: |
+ Moved the subset, ring_hash, and maglev LB code into extensions. If you use these load balancers and override
+diff --git a/source/common/http/conn_manager_config.h b/source/common/http/conn_manager_config.h
+index 52f8188c205c5..a07eb825f789b 100644
+--- a/source/common/http/conn_manager_config.h
++++ b/source/common/http/conn_manager_config.h
+@@ -66,6 +66,7 @@ namespace Http {
+ COUNTER(downstream_rq_rejected_via_ip_detection) \
+ COUNTER(downstream_rq_response_before_rq_complete) \
+ COUNTER(downstream_rq_rx_reset) \
++ COUNTER(downstream_rq_too_many_premature_resets) \
+ COUNTER(downstream_rq_timeout) \
+ COUNTER(downstream_rq_header_timeout) \
+ COUNTER(downstream_rq_too_large) \
+diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc
+index 5a603aaed80b6..021c55bc4ccae 100644
+--- a/source/common/http/conn_manager_impl.cc
++++ b/source/common/http/conn_manager_impl.cc
+@@ -1,5 +1,6 @@
+ #include "source/common/http/conn_manager_impl.h"
+
++#include <chrono>
+ #include <cstdint>
+ #include <functional>
+ #include <list>
+@@ -55,6 +56,11 @@
+ namespace Envoy {
+ namespace Http {
+
++const absl::string_view ConnectionManagerImpl::PrematureResetTotalStreamCountKey =
++ "overload.premature_reset_total_stream_count";
++const absl::string_view ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey =
++ "overload.premature_reset_min_stream_lifetime_seconds";
++
+ bool requestWasConnect(const RequestHeaderMapSharedPtr& headers, Protocol protocol) {
+ if (!headers) {
+ return false;
+@@ -267,6 +273,12 @@ void ConnectionManagerImpl::doEndStream(ActiveStream& stream, bool check_for_def
+ }
+
+ void ConnectionManagerImpl::doDeferredStreamDestroy(ActiveStream& stream) {
++ if (!stream.state_.is_internally_destroyed_) {
++ ++closed_non_internally_destroyed_requests_;
++ if (isPrematureRstStream(stream)) {
++ ++number_premature_stream_resets_;
++ }
++ }
+ if (stream.max_stream_duration_timer_ != nullptr) {
+ stream.max_stream_duration_timer_->disableTimer();
+ stream.max_stream_duration_timer_ = nullptr;
+@@ -343,6 +355,7 @@ void ConnectionManagerImpl::doDeferredStreamDestroy(ActiveStream& stream) {
+ if (connection_idle_timer_ && streams_.empty()) {
+ connection_idle_timer_->enableTimer(config_.idleTimeout().value());
+ }
++ maybeDrainDueToPrematureResets();
+ }
+
+ RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encoder,
+@@ -607,6 +620,58 @@ void ConnectionManagerImpl::doConnectionClose(
+ }
+ }
+
++bool ConnectionManagerImpl::isPrematureRstStream(const ActiveStream& stream) const {
++ // Check if the request was prematurely reset, by comparing its lifetime to the configured
++ // threshold.
++ ASSERT(!stream.state_.is_internally_destroyed_);
++ absl::optional<std::chrono::nanoseconds> duration =
++ stream.filter_manager_.streamInfo().currentDuration();
++
++ // Check if request lifetime is longer than the premature reset threshold.
++ if (duration) {
++ const uint64_t lifetime = std::chrono::duration_cast<std::chrono::seconds>(*duration).count();
++ const uint64_t min_lifetime = runtime_.snapshot().getInteger(
++ ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey, 1);
++ if (lifetime > min_lifetime) {
++ return false;
++ }
++ }
++
++ // If request has completed before configured threshold, also check if the Envoy proxied the
++ // response from the upstream. Requests without the response status were reset.
++ // TODO(RyanTheOptimist): Possibly support half_closed_local instead.
++ return !stream.filter_manager_.streamInfo().responseCode();
++}
++
++// Sends a GOAWAY if too many streams have been reset prematurely on this
++// connection.
++void ConnectionManagerImpl::maybeDrainDueToPrematureResets() {
++ if (!Runtime::runtimeFeatureEnabled(
++ "envoy.restart_features.send_goaway_for_premature_rst_streams") ||
++ closed_non_internally_destroyed_requests_ == 0) {
++ return;
++ }
++
++ const uint64_t limit =
++ runtime_.snapshot().getInteger(ConnectionManagerImpl::PrematureResetTotalStreamCountKey, 500);
++
++ if (closed_non_internally_destroyed_requests_ < limit) {
++ return;
++ }
++
++ if (static_cast<double>(number_premature_stream_resets_) /
++ closed_non_internally_destroyed_requests_ <
++ .5) {
++ return;
++ }
++
++ if (drain_state_ == DrainState::NotDraining) {
++ stats_.named_.downstream_rq_too_many_premature_resets_.inc();
++ doConnectionClose(Network::ConnectionCloseType::Abort, absl::nullopt,
++ "too_many_premature_resets");
++ }
++}
++
+ void ConnectionManagerImpl::onGoAway(GoAwayErrorCode) {
+ // Currently we do nothing with remote go away frames. In the future we can decide to no longer
+ // push resources if applicable.
+diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h
+index b82b1967a5115..dad494c953bc0 100644
+--- a/source/common/http/conn_manager_impl.h
++++ b/source/common/http/conn_manager_impl.h
+@@ -115,6 +115,14 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
+ void setClearHopByHopResponseHeaders(bool value) { clear_hop_by_hop_response_headers_ = value; }
+ bool clearHopByHopResponseHeaders() const { return clear_hop_by_hop_response_headers_; }
+
++ // This runtime key configures the number of streams which must be closed on a connection before
++ // envoy will potentially drain a connection due to excessive prematurely reset streams.
++ static const absl::string_view PrematureResetTotalStreamCountKey;
++
++ // The minimum lifetime of a stream, in seconds, in order not to be considered
++ // prematurely closed.
++ static const absl::string_view PrematureResetMinStreamLifetimeSecondsKey;
++
+ private:
+ struct ActiveStream;
+ class MobileConnectionManagerImpl;
+@@ -544,6 +552,15 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
+ void doConnectionClose(absl::optional<Network::ConnectionCloseType> close_type,
+ absl::optional<StreamInfo::ResponseFlag> response_flag,
+ absl::string_view details);
++ // Returns true if a RST_STREAM for the given stream is premature. Premature
++ // means the RST_STREAM arrived before response headers were sent and than
++ // the stream was alive for short period of time. This period is specified
++ // by the optional runtime value PrematureResetMinStreamLifetimeSecondsKey,
++ // or one second if that is not present.
++ bool isPrematureRstStream(const ActiveStream& stream) const;
++ // Sends a GOAWAY if both sufficient streams have been closed on a connection
++ // and at least half have been prematurely reset?
++ void maybeDrainDueToPrematureResets();
+
+ enum class DrainState { NotDraining, Draining, Closing };
+
+@@ -584,6 +601,12 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
+ bool clear_hop_by_hop_response_headers_{true};
+ // The number of requests accumulated on the current connection.
+ uint64_t accumulated_requests_{};
++ // The number of requests closed on the current connection which were
++ // not internally destroyed
++ uint64_t closed_non_internally_destroyed_requests_{};
++ // The number of requests that received a premature RST_STREAM, according to
++ // the definition given in `isPrematureRstStream()`.
++ uint64_t number_premature_stream_resets_{0};
+ const std::string proxy_name_; // for Proxy-Status.
+
+ const bool refresh_rtt_after_request_{};
+diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc
+index 8b75d6bbbfb1c..60d594c964319 100644
+--- a/source/common/runtime/runtime_features.cc
++++ b/source/common/runtime/runtime_features.cc
+@@ -91,6 +91,7 @@ RUNTIME_GUARD(envoy_reloadable_features_validate_grpc_header_before_log_grpc_sta
+ RUNTIME_GUARD(envoy_reloadable_features_validate_upstream_headers);
+ RUNTIME_GUARD(envoy_restart_features_explicit_wildcard_resource);
+ RUNTIME_GUARD(envoy_restart_features_remove_runtime_singleton);
++RUNTIME_GUARD(envoy_restart_features_send_goaway_for_premature_rst_streams);
+ RUNTIME_GUARD(envoy_restart_features_udp_read_normalize_addresses);
+ RUNTIME_GUARD(envoy_restart_features_use_apple_api_for_dns_lookups);
+
+diff --git a/test/integration/multiplexed_integration_test.cc b/test/integration/multiplexed_integration_test.cc
+index 190e57065d1c6..b4260ff3fa9ec 100644
+--- a/test/integration/multiplexed_integration_test.cc
++++ b/test/integration/multiplexed_integration_test.cc
+@@ -1,4 +1,5 @@
+ #include <algorithm>
++#include <chrono>
+ #include <memory>
+ #include <string>
+
+@@ -25,6 +26,7 @@
+ #include "test/mocks/http/mocks.h"
+ #include "test/test_common/network_utility.h"
+ #include "test/test_common/printers.h"
++#include "test/test_common/simulated_time_system.h"
+ #include "test/test_common/utility.h"
+
+ #include "gtest/gtest.h"
+@@ -92,6 +94,15 @@ INSTANTIATE_TEST_SUITE_P(IpVersions, MultiplexedIntegrationTest,
+ {Http::CodecType::HTTP1})),
+ HttpProtocolIntegrationTest::protocolTestParamsToString);
+
++class MultiplexedIntegrationTestWithSimulatedTime : public Event::TestUsingSimulatedTime,
++ public MultiplexedIntegrationTest {};
++
++INSTANTIATE_TEST_SUITE_P(IpVersions, MultiplexedIntegrationTestWithSimulatedTime,
++ testing::ValuesIn(HttpProtocolIntegrationTest::getProtocolTestParams(
++ {Http::CodecType::HTTP2, Http::CodecType::HTTP3},
++ {Http::CodecType::HTTP1})),
++ HttpProtocolIntegrationTest::protocolTestParamsToString);
++
+ TEST_P(MultiplexedIntegrationTest, RouterRequestAndResponseWithBodyNoBuffer) {
+ testRouterRequestAndResponseWithBody(1024, 512, false, false);
+ }
+@@ -1076,6 +1087,67 @@ TEST_P(MultiplexedIntegrationTest, GoAway) {
+ EXPECT_EQ("200", response->headers().getStatusValue());
+ }
+
++// TODO(rch): Add a unit test which covers internal redirect handling.
++TEST_P(MultiplexedIntegrationTestWithSimulatedTime, GoAwayAfterTooManyResets) {
++ EXCLUDE_DOWNSTREAM_HTTP3; // Need to wait for the server to reset the stream
++ // before opening new one.
++ config_helper_.addRuntimeOverride("envoy.restart_features.send_goaway_for_premature_rst_streams",
++ "true");
++ const int total_streams = 100;
++ config_helper_.addRuntimeOverride("overload.premature_reset_total_stream_count",
++ absl::StrCat(total_streams));
++ initialize();
++
++ Http::TestRequestHeaderMapImpl headers{
++ {":method", "GET"}, {":path", "/healthcheck"}, {":scheme", "http"}, {":authority", "host"}};
++ codec_client_ = makeHttpConnection(lookupPort("http"));
++ for (int i = 0; i < total_streams; ++i) {
++ auto encoder_decoder = codec_client_->startRequest(headers);
++ request_encoder_ = &encoder_decoder.first;
++ auto response = std::move(encoder_decoder.second);
++ codec_client_->sendReset(*request_encoder_);
++ ASSERT_TRUE(response->waitForReset());
++ }
++
++ // Envoy should disconnect client due to premature reset check
++ ASSERT_TRUE(codec_client_->waitForDisconnect());
++ test_server_->waitForCounterEq("http.config_test.downstream_rq_rx_reset", total_streams);
++ test_server_->waitForCounterEq("http.config_test.downstream_rq_too_many_premature_resets", 1);
++}
++
++TEST_P(MultiplexedIntegrationTestWithSimulatedTime, DontGoAwayAfterTooManyResetsForLongStreams) {
++ EXCLUDE_DOWNSTREAM_HTTP3; // Need to wait for the server to reset the stream
++ // before opening new one.
++ config_helper_.addRuntimeOverride("envoy.restart_features.send_goaway_for_premature_rst_streams",
++ "true");
++ const int total_streams = 100;
++ const int stream_lifetime_seconds = 2;
++ config_helper_.addRuntimeOverride("overload.premature_reset_total_stream_count",
++ absl::StrCat(total_streams));
++
++ config_helper_.addRuntimeOverride("overload.premature_reset_min_stream_lifetime_seconds",
++ absl::StrCat(stream_lifetime_seconds));
++
++ initialize();
++
++ Http::TestRequestHeaderMapImpl headers{
++ {":method", "GET"}, {":path", "/healthcheck"}, {":scheme", "http"}, {":authority", "host"}};
++ codec_client_ = makeHttpConnection(lookupPort("http"));
++
++ std::string request_counter = "http.config_test.downstream_rq_total";
++ std::string reset_counter = "http.config_test.downstream_rq_rx_reset";
++ for (int i = 0; i < total_streams * 2; ++i) {
++ auto encoder_decoder = codec_client_->startRequest(headers);
++ request_encoder_ = &encoder_decoder.first;
++ auto response = std::move(encoder_decoder.second);
++ test_server_->waitForCounterEq(request_counter, i + 1);
++ timeSystem().advanceTimeWait(std::chrono::seconds(2 * stream_lifetime_seconds));
++ codec_client_->sendReset(*request_encoder_);
++ ASSERT_TRUE(response->waitForReset());
++ test_server_->waitForCounterEq(reset_counter, i + 1);
++ }
++}
++
+ TEST_P(MultiplexedIntegrationTest, Trailers) { testTrailers(1024, 2048, false, false); }
+
+ TEST_P(MultiplexedIntegrationTest, TrailersGiantBody) {
diff --git a/0004-limit-http-requests-per-io-cycle.patch b/0004-limit-http-requests-per-io-cycle.patch
new file mode 100644
index 000000000000..702e799513ec
--- /dev/null
+++ b/0004-limit-http-requests-per-io-cycle.patch
@@ -0,0 +1,821 @@
+From 2e4228b0ee73ae640c92e0974c91e251997a3d2f Mon Sep 17 00:00:00 2001
+From: Yan Avlasov <yavlasov@google.com>
+Date: Sat, 30 Sep 2023 13:58:46 +0000
+Subject: [PATCH] Limit on the number of HTTP requests processed from a
+ connection in an I/O cycle
+
+Signed-off-by: Yan Avlasov <yavlasov@google.com>
+---
+ changelogs/current.yaml | 7 +
+ source/common/http/conn_manager_impl.cc | 89 ++++++-
+ source/common/http/conn_manager_impl.h | 25 +-
+ test/common/http/conn_manager_impl_test_2.cc | 245 ++++++++++++++++++
+ .../http/conn_manager_impl_test_base.cc | 19 ++
+ .../common/http/conn_manager_impl_test_base.h | 2 +
+ test/common/http/http2/http2_frame.cc | 12 +-
+ test/common/http/http2/http2_frame.h | 14 +-
+ .../multiplexed_integration_test.cc | 170 ++++++++++++
+ 9 files changed, 569 insertions(+), 14 deletions(-)
+
+diff --git a/changelogs/current.yaml b/changelogs/current.yaml
+index b5d45089f337..86d0eac2339f 100644
+--- a/changelogs/current.yaml
++++ b/changelogs/current.yaml
+@@ -10,6 +10,13 @@
+ resets is applied. The connection is disconnected if more than 50% of resets are premature.
+ Setting the runtime key ``envoy.restart_features.send_goaway_for_premature_rst_streams`` to ``false`` completely disables
+ this check.
++- area: http
++ change: |
++ Add runtime flag ``http.max_requests_per_io_cycle`` for setting the limit on the number of HTTP requests processed
++ from a single connection in a single I/O cycle. Requests over this limit are processed in subsequent I/O cycles. This
++ mitigates CPU starvation by connections that simultaneously send high number of requests by allowing requests from other
++ connections to make progress. This runtime value can be set to 1 in the presence of abusive HTTP/2 or HTTP/3 connections.
++ By default this limit is disabled.
+ - area: build
+ change: |
+ Moved the subset, ring_hash, and maglev LB code into extensions. If you use these load balancers and override
+diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc
+index 021c55bc4cca..bf0439708413 100644
+--- a/source/common/http/conn_manager_impl.cc
++++ b/source/common/http/conn_manager_impl.cc
+@@ -60,6 +60,10 @@ const absl::string_view ConnectionManagerImpl::PrematureResetTotalStreamCountKey
+ "overload.premature_reset_total_stream_count";
+ const absl::string_view ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey =
+ "overload.premature_reset_min_stream_lifetime_seconds";
++// Runtime key for maximum number of requests that can be processed from a single connection per
++// I/O cycle. Requests over this limit are deferred until the next I/O cycle.
++const absl::string_view ConnectionManagerImpl::MaxRequestsPerIoCycle =
++ "http.max_requests_per_io_cycle";
+
+ bool requestWasConnect(const RequestHeaderMapSharedPtr& headers, Protocol protocol) {
+ if (!headers) {
+@@ -116,6 +120,8 @@ ConnectionManagerImpl::ConnectionManagerImpl(ConnectionManagerConfig& config,
+ /*node_id=*/local_info_.node().id(),
+ /*server_name=*/config_.serverName(),
+ /*proxy_status_config=*/config_.proxyStatusConfig())),
++ max_requests_during_dispatch_(
++ runtime_.snapshot().getInteger(ConnectionManagerImpl::MaxRequestsPerIoCycle, UINT32_MAX)),
+ refresh_rtt_after_request_(
+ Runtime::runtimeFeatureEnabled("envoy.reloadable_features.refresh_rtt_after_request")) {}
+
+@@ -128,6 +134,10 @@ const ResponseHeaderMap& ConnectionManagerImpl::continueHeader() {
+ void ConnectionManagerImpl::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) {
+ read_callbacks_ = &callbacks;
+ dispatcher_ = &callbacks.connection().dispatcher();
++ if (max_requests_during_dispatch_ != UINT32_MAX) {
++ deferred_request_processing_callback_ =
++ dispatcher_->createSchedulableCallback([this]() -> void { onDeferredRequestProcessing(); });
++ }
+
+ stats_.named_.downstream_cx_total_.inc();
+ stats_.named_.downstream_cx_active_.inc();
+@@ -454,6 +464,7 @@ void ConnectionManagerImpl::createCodec(Buffer::Instance& data) {
+ }
+
+ Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {
++ requests_during_dispatch_count_ = 0;
+ if (!codec_) {
+ // Http3 codec should have been instantiated by now.
+ createCodec(data);
+@@ -1366,7 +1377,12 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt
+ traceRequest();
+ }
+
+- filter_manager_.decodeHeaders(*request_headers_, end_stream);
++ if (!connection_manager_.shouldDeferRequestProxyingToNextIoCycle()) {
++ filter_manager_.decodeHeaders(*request_headers_, end_stream);
++ } else {
++ state_.deferred_to_next_io_iteration_ = true;
++ state_.deferred_end_stream_ = end_stream;
++ }
+
+ // Reset it here for both global and overridden cases.
+ resetIdleTimer();
+@@ -1433,8 +1449,15 @@ void ConnectionManagerImpl::ActiveStream::decodeData(Buffer::Instance& data, boo
+ connection_manager_.read_callbacks_->connection().dispatcher());
+ maybeEndDecode(end_stream);
+ filter_manager_.streamInfo().addBytesReceived(data.length());
+-
+- filter_manager_.decodeData(data, end_stream);
++ if (!state_.deferred_to_next_io_iteration_) {
++ filter_manager_.decodeData(data, end_stream);
++ } else {
++ if (!deferred_data_) {
++ deferred_data_ = std::make_unique<Buffer::OwnedImpl>();
++ }
++ deferred_data_->move(data);
++ state_.deferred_end_stream_ = end_stream;
++ }
+ }
+
+ void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&& trailers) {
+@@ -1450,7 +1473,9 @@ void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&&
+ return;
+ }
+ maybeEndDecode(true);
+- filter_manager_.decodeTrailers(*request_trailers_);
++ if (!state_.deferred_to_next_io_iteration_) {
++ filter_manager_.decodeTrailers(*request_trailers_);
++ }
+ }
+
+ void ConnectionManagerImpl::ActiveStream::decodeMetadata(MetadataMapPtr&& metadata_map) {
+@@ -2158,5 +2183,61 @@ void ConnectionManagerImpl::ActiveStream::resetStream(Http::StreamResetReason, a
+ connection_manager_.doEndStream(*this);
+ }
+
++bool ConnectionManagerImpl::ActiveStream::onDeferredRequestProcessing() {
++ // TODO(yanavlasov): Merge this with the filter manager continueIteration() method
++ if (!state_.deferred_to_next_io_iteration_) {
++ return false;
++ }
++ state_.deferred_to_next_io_iteration_ = false;
++ bool end_stream =
++ state_.deferred_end_stream_ && deferred_data_ == nullptr && request_trailers_ == nullptr;
++ filter_manager_.decodeHeaders(*request_headers_, end_stream);
++ if (end_stream) {
++ return true;
++ }
++ if (deferred_data_ != nullptr) {
++ end_stream = state_.deferred_end_stream_ && request_trailers_ == nullptr;
++ filter_manager_.decodeData(*deferred_data_, end_stream);
++ }
++ if (request_trailers_ != nullptr) {
++ filter_manager_.decodeTrailers(*request_trailers_);
++ }
++ return true;
++}
++
++bool ConnectionManagerImpl::shouldDeferRequestProxyingToNextIoCycle() {
++ // Do not defer this stream if stream deferral is disabled
++ if (deferred_request_processing_callback_ == nullptr) {
++ return false;
++ }
++ // Defer this stream if there are already deferred streams, so they are not
++ // processed out of order
++ if (deferred_request_processing_callback_->enabled()) {
++ return true;
++ }
++ ++requests_during_dispatch_count_;
++ bool defer = requests_during_dispatch_count_ > max_requests_during_dispatch_;
++ if (defer) {
++ deferred_request_processing_callback_->scheduleCallbackNextIteration();
++ }
++ return defer;
++}
++
++void ConnectionManagerImpl::onDeferredRequestProcessing() {
++ requests_during_dispatch_count_ = 1; // 1 stream is always let through
++ // Streams are inserted at the head of the list. As such process deferred
++ // streams at the back of the list first.
++ for (auto reverse_iter = streams_.rbegin(); reverse_iter != streams_.rend();) {
++ auto& stream_ptr = *reverse_iter;
++ // Move the iterator to the next item in case the `onDeferredRequestProcessing` call removes the
++ // stream from the list.
++ ++reverse_iter;
++ bool was_deferred = stream_ptr->onDeferredRequestProcessing();
++ if (was_deferred && shouldDeferRequestProxyingToNextIoCycle()) {
++ break;
++ }
++ }
++}
++
+ } // namespace Http
+ } // namespace Envoy
+diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h
+index dad494c953bc..e79a6a81c082 100644
+--- a/source/common/http/conn_manager_impl.h
++++ b/source/common/http/conn_manager_impl.h
+@@ -122,6 +122,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
+ // The minimum lifetime of a stream, in seconds, in order not to be considered
+ // prematurely closed.
+ static const absl::string_view PrematureResetMinStreamLifetimeSecondsKey;
++ static const absl::string_view MaxRequestsPerIoCycle;
+
+ private:
+ struct ActiveStream;
+@@ -345,7 +346,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
+ : codec_saw_local_complete_(false), codec_encode_complete_(false),
+ on_reset_stream_called_(false), is_zombie_stream_(false), saw_connection_close_(false),
+ successful_upgrade_(false), is_internally_destroyed_(false),
+- is_internally_created_(false), is_tunneling_(false), decorated_propagate_(true) {}
++ is_internally_created_(false), is_tunneling_(false), decorated_propagate_(true),
++ deferred_to_next_io_iteration_(false) {}
+
+ // It's possibly for the codec to see the completed response but not fully
+ // encode it.
+@@ -371,6 +373,14 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
+ bool is_tunneling_ : 1;
+
+ bool decorated_propagate_ : 1;
++
++ // Indicates that sending headers to the filter manager is deferred to the
++ // next I/O cycle. If data or trailers are received when this flag is set
++ // they are deferred too.
++ // TODO(yanavlasov): encapsulate the entire state of deferred streams into a separate
++ // structure, so it can be atomically created and cleared.
++ bool deferred_to_next_io_iteration_ : 1;
++ bool deferred_end_stream_ : 1;
+ };
+
+ bool canDestroyStream() const {
+@@ -418,6 +428,11 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
+ // HTTP connection manager configuration, then the entire connection is closed.
+ bool validateTrailers();
+
++ // Dispatch deferred headers, body and trailers to the filter manager.
++ // Return true if this stream was deferred and dispatched pending headers, body and trailers (if
++ // present). Return false if this stream was not deferred.
++ bool onDeferredRequestProcessing();
++
+ ConnectionManagerImpl& connection_manager_;
+ OptRef<const TracingConnectionManagerConfig> connection_manager_tracing_config_;
+ // TODO(snowp): It might make sense to move this to the FilterManager to avoid storing it in
+@@ -504,6 +519,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
+ const Tracing::CustomTagMap* customTags() const override;
+ bool verbose() const override;
+ uint32_t maxPathTagLength() const override;
++
++ std::unique_ptr<Buffer::OwnedImpl> deferred_data_;
+ };
+
+ using ActiveStreamPtr = std::unique_ptr<ActiveStream>;
+@@ -562,6 +579,9 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
+ // and at least half have been prematurely reset?
+ void maybeDrainDueToPrematureResets();
+
++ bool shouldDeferRequestProxyingToNextIoCycle();
++ void onDeferredRequestProcessing();
++
+ enum class DrainState { NotDraining, Draining, Closing };
+
+ ConnectionManagerConfig& config_;
+@@ -608,6 +628,9 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
+ // the definition given in `isPrematureRstStream()`.
+ uint64_t number_premature_stream_resets_{0};
+ const std::string proxy_name_; // for Proxy-Status.
++ uint32_t requests_during_dispatch_count_{0};
++ const uint32_t max_requests_during_dispatch_{UINT32_MAX};
++ Event::SchedulableCallbackPtr deferred_request_processing_callback_;
+
+ const bool refresh_rtt_after_request_{};
+ };
+diff --git a/test/common/http/conn_manager_impl_test_2.cc b/test/common/http/conn_manager_impl_test_2.cc
+index 5daf2b1a45e6..078bd1d85ab7 100644
+--- a/test/common/http/conn_manager_impl_test_2.cc
++++ b/test/common/http/conn_manager_impl_test_2.cc
+@@ -11,6 +11,7 @@ using testing::InvokeWithoutArgs;
+ using testing::Mock;
+ using testing::Ref;
+ using testing::Return;
++using testing::ReturnArg;
+ using testing::ReturnRef;
+
+ namespace Envoy {
+@@ -3767,5 +3768,249 @@ TEST_F(HttpConnectionManagerImplTest, NoProxyProtocolAdded) {
+ // Clean up.
+ filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose);
+ }
++
++// Validate that deferred streams are processed with a variety of
++// headers, data and trailer arriving in the same I/O cycle
++TEST_F(HttpConnectionManagerImplTest, LimitWorkPerIOCycle) {
++ const int kRequestsSentPerIOCycle = 100;
++ EXPECT_CALL(runtime_.snapshot_, getInteger(_, _)).WillRepeatedly(ReturnArg<1>());
++ // Process 1 request per I/O cycle
++ auto* deferred_request_callback = enableStreamsPerIoLimit(1);
++ setup(false, "");
++
++ // Store the basic request encoder during filter chain setup.
++ std::vector<std::shared_ptr<MockStreamDecoderFilter>> encoder_filters;
++ int decode_headers_call_count = 0;
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ std::shared_ptr<MockStreamDecoderFilter> filter(new NiceMock<MockStreamDecoderFilter>());
++
++ // Each 4th request is headers only
++ EXPECT_CALL(*filter, decodeHeaders(_, i % 4 == 0 ? true : false))
++ .WillRepeatedly(Invoke([&](RequestHeaderMap&, bool) -> FilterHeadersStatus {
++ ++decode_headers_call_count;
++ return FilterHeadersStatus::StopIteration;
++ }));
++
++ // Each 1st request is headers and data only
++ // Each 2nd request is headers, data and trailers
++ if (i % 4 == 1 || i % 4 == 2) {
++ EXPECT_CALL(*filter, decodeData(_, i % 4 == 1 ? true : false))
++ .WillOnce(Return(FilterDataStatus::StopIterationNoBuffer));
++ }
++
++ // Each 3rd request is headers and trailers (no data)
++ if (i % 4 == 2 || i % 4 == 3) {
++ EXPECT_CALL(*filter, decodeTrailers(_)).WillOnce(Return(FilterTrailersStatus::StopIteration));
++ }
++
++ EXPECT_CALL(*filter, setDecoderFilterCallbacks(_));
++ encoder_filters.push_back(std::move(filter));
++ }
++
++ uint64_t random_value = 0;
++ EXPECT_CALL(random_, random()).WillRepeatedly(Invoke([&random_value]() {
++ return random_value++;
++ }));
++
++ EXPECT_CALL(filter_factory_, createFilterChain(_))
++ .Times(kRequestsSentPerIOCycle)
++ .WillRepeatedly(Invoke([&encoder_filters](FilterChainManager& manager) -> bool {
++ static int index = 0;
++ int i = index++;
++ FilterFactoryCb factory([&encoder_filters, i](FilterChainFactoryCallbacks& callbacks) {
++ callbacks.addStreamDecoderFilter(encoder_filters[i]);
++ });
++ manager.applyFilterFactoryCb({}, factory);
++ return true;
++ }));
++
++ EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, deferredDelete_(_))
++ .Times(kRequestsSentPerIOCycle);
++
++ std::vector<NiceMock<MockResponseEncoder>> response_encoders(kRequestsSentPerIOCycle);
++ for (auto& encoder : response_encoders) {
++ EXPECT_CALL(encoder, getStream()).WillRepeatedly(ReturnRef(encoder.stream_));
++ }
++
++ EXPECT_CALL(*codec_, dispatch(_))
++ .WillRepeatedly(Invoke([&](Buffer::Instance& data) -> Http::Status {
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ decoder_ = &conn_manager_->newStream(response_encoders[i]);
++
++ RequestHeaderMapPtr headers{new TestRequestHeaderMapImpl{
++ {":authority", "host"}, {":path", "/"}, {":method", "GET"}}};
++
++ RequestTrailerMapPtr trailers{
++ new TestRequestTrailerMapImpl{{"key1", "value1"}, {"key2", "value2"}}};
++
++ Buffer::OwnedImpl data("data");
++
++ switch (i % 4) {
++ case 0:
++ decoder_->decodeHeaders(std::move(headers), true);
++ break;
++ case 1:
++ decoder_->decodeHeaders(std::move(headers), false);
++ decoder_->decodeData(data, true);
++ break;
++ case 2:
++ decoder_->decodeHeaders(std::move(headers), false);
++ decoder_->decodeData(data, false);
++ decoder_->decodeTrailers(std::move(trailers));
++ break;
++ case 3:
++ decoder_->decodeHeaders(std::move(headers), false);
++ decoder_->decodeTrailers(std::move(trailers));
++ break;
++ }
++ }
++
++ data.drain(4);
++ return Http::okStatus();
++ }));
++
++ // Kick off the incoming data.
++ Buffer::OwnedImpl fake_input("1234");
++ conn_manager_->onData(fake_input, false);
++
++ EXPECT_TRUE(deferred_request_callback->enabled_);
++ // Only one request should go through the filter chain
++ ASSERT_EQ(decode_headers_call_count, 1);
++
++ // Let other requests to go through the filter chain. Call expectations will fail
++ // if this is not the case.
++ int deferred_request_count = 0;
++ while (deferred_request_callback->enabled_) {
++ deferred_request_callback->invokeCallback();
++ ++deferred_request_count;
++ }
++
++ ASSERT_EQ(deferred_request_count, kRequestsSentPerIOCycle);
++
++ for (auto& filter : encoder_filters) {
++ ResponseHeaderMapPtr response_headers{new TestResponseHeaderMapImpl{{":status", "200"}}};
++ filter->callbacks_->streamInfo().setResponseCodeDetails("");
++ filter->callbacks_->encodeHeaders(std::move(response_headers), true, "details");
++ }
++
++ EXPECT_EQ(kRequestsSentPerIOCycle, stats_.named_.downstream_rq_2xx_.value());
++ EXPECT_EQ(kRequestsSentPerIOCycle, listener_stats_.downstream_rq_2xx_.value());
++ EXPECT_EQ(kRequestsSentPerIOCycle, stats_.named_.downstream_rq_completed_.value());
++ EXPECT_EQ(kRequestsSentPerIOCycle, listener_stats_.downstream_rq_completed_.value());
++}
++
++TEST_F(HttpConnectionManagerImplTest, StreamDeferralPreservesOrder) {
++ EXPECT_CALL(runtime_.snapshot_, getInteger(_, _)).WillRepeatedly(ReturnArg<1>());
++ // Process 1 request per I/O cycle
++ auto* deferred_request_callback = enableStreamsPerIoLimit(1);
++ setup(false, "");
++
++ std::vector<std::shared_ptr<MockStreamDecoderFilter>> encoder_filters;
++ int expected_request_id = 0;
++ const Http::LowerCaseString request_id_header(absl::string_view("request-id"));
++ // Two requests are processed in 2 I/O reads
++ const int TotalRequests = 2 * 2;
++ for (int i = 0; i < TotalRequests; ++i) {
++ std::shared_ptr<MockStreamDecoderFilter> filter(new NiceMock<MockStreamDecoderFilter>());
++
++ EXPECT_CALL(*filter, decodeHeaders(_, true))
++ .WillRepeatedly(Invoke([&](RequestHeaderMap& headers, bool) -> FilterHeadersStatus {
++ // Check that requests are decoded in expected order
++ int request_id = 0;
++ ASSERT(absl::SimpleAtoi(headers.get(request_id_header)[0]->value().getStringView(),
++ &request_id));
++ ASSERT(request_id == expected_request_id);
++ ++expected_request_id;
++ return FilterHeadersStatus::StopIteration;
++ }));
++
++ EXPECT_CALL(*filter, setDecoderFilterCallbacks(_));
++ encoder_filters.push_back(std::move(filter));
++ }
++
++ uint64_t random_value = 0;
++ EXPECT_CALL(random_, random()).WillRepeatedly(Invoke([&random_value]() {
++ return random_value++;
++ }));
++
++ EXPECT_CALL(filter_factory_, createFilterChain(_))
++ .Times(TotalRequests)
++ .WillRepeatedly(Invoke([&encoder_filters](FilterChainManager& manager) -> bool {
++ static int index = 0;
++ int i = index++;
++ FilterFactoryCb factory([&encoder_filters, i](FilterChainFactoryCallbacks& callbacks) {
++ callbacks.addStreamDecoderFilter(encoder_filters[i]);
++ });
++ manager.applyFilterFactoryCb({}, factory);
++ return true;
++ }));
++
++ EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, deferredDelete_(_)).Times(TotalRequests);
++
++ std::vector<NiceMock<MockResponseEncoder>> response_encoders(TotalRequests);
++ for (auto& encoder : response_encoders) {
++ EXPECT_CALL(encoder, getStream()).WillRepeatedly(ReturnRef(encoder.stream_));
++ }
++ auto response_encoders_iter = response_encoders.begin();
++
++ int request_id = 0;
++ EXPECT_CALL(*codec_, dispatch(_))
++ .WillRepeatedly(Invoke([&](Buffer::Instance& data) -> Http::Status {
++ // The second request should be deferred
++ for (int i = 0; i < 2; ++i) {
++ decoder_ = &conn_manager_->newStream(*response_encoders_iter);
++ ++response_encoders_iter;
++
++ RequestHeaderMapPtr headers{
++ new TestRequestHeaderMapImpl{{":authority", "host"},
++ {":path", "/"},
++ {":method", "GET"},
++ {"request-id", absl::StrCat(request_id)}}};
++
++ ++request_id;
++ decoder_->decodeHeaders(std::move(headers), true);
++ }
++
++ data.drain(4);
++ return Http::okStatus();
++ }));
++
++ // Kick off the incoming data.
++ Buffer::OwnedImpl fake_input("1234");
++ conn_manager_->onData(fake_input, false);
++
++ EXPECT_TRUE(deferred_request_callback->enabled_);
++ // Only one request should go through the filter chain
++ ASSERT_EQ(expected_request_id, 1);
++
++ // Test arrival of another request. New request is read from the socket before deferred callbacks.
++ Buffer::OwnedImpl fake_input2("1234");
++ conn_manager_->onData(fake_input2, false);
++
++ // No requests from the second read should go through as there are deferred stream present
++ ASSERT_EQ(expected_request_id, 1);
++
++ // Let other requests to go through the filter chain. Call expectations will fail
++ // if this is not the case.
++ int deferred_request_count = 0;
++ while (deferred_request_callback->enabled_) {
++ deferred_request_callback->invokeCallback();
++ ++deferred_request_count;
++ }
++
++ ASSERT_EQ(deferred_request_count, TotalRequests);
++
++ for (auto& filter : encoder_filters) {
++ ResponseHeaderMapPtr response_headers{new TestResponseHeaderMapImpl{{":status", "200"}}};
++ filter->callbacks_->streamInfo().setResponseCodeDetails("");
++ filter->callbacks_->encodeHeaders(std::move(response_headers), true, "details");
++ }
++
++ EXPECT_EQ(TotalRequests, stats_.named_.downstream_rq_2xx_.value());
++ EXPECT_EQ(TotalRequests, listener_stats_.downstream_rq_2xx_.value());
++ EXPECT_EQ(TotalRequests, stats_.named_.downstream_rq_completed_.value());
++ EXPECT_EQ(TotalRequests, listener_stats_.downstream_rq_completed_.value());
++}
++
+ } // namespace Http
+ } // namespace Envoy
+diff --git a/test/common/http/conn_manager_impl_test_base.cc b/test/common/http/conn_manager_impl_test_base.cc
+index 1c5053246e44..641ca6fae082 100644
+--- a/test/common/http/conn_manager_impl_test_base.cc
++++ b/test/common/http/conn_manager_impl_test_base.cc
+@@ -78,6 +78,7 @@ void HttpConnectionManagerImplMixin::setup(bool ssl, const std::string& server_n
+ conn_manager_ = std::make_unique<ConnectionManagerImpl>(
+ *this, drain_close_, random_, http_context_, runtime_, local_info_, cluster_manager_,
+ overload_manager_, test_time_.timeSystem());
++
+ conn_manager_->initializeReadFilterCallbacks(filter_callbacks_);
+
+ if (tracing) {
+@@ -370,5 +371,23 @@ void HttpConnectionManagerImplMixin::expectUhvTrailerCheck(
+ }));
+ }
+
++Event::MockSchedulableCallback*
++HttpConnectionManagerImplMixin::enableStreamsPerIoLimit(uint32_t limit) {
++ EXPECT_CALL(runtime_.snapshot_, getInteger("http.max_requests_per_io_cycle", _))
++ .WillOnce(Return(limit));
++
++ // Expect HCM to create and set schedulable callback
++ auto* deferred_request_callback =
++ new Event::MockSchedulableCallback(&filter_callbacks_.connection_.dispatcher_);
++ EXPECT_CALL(*deferred_request_callback, enabled())
++ .WillRepeatedly(
++ Invoke([deferred_request_callback]() { return deferred_request_callback->enabled_; }));
++ EXPECT_CALL(*deferred_request_callback, scheduleCallbackNextIteration())
++ .WillRepeatedly(
++ Invoke([deferred_request_callback]() { deferred_request_callback->enabled_ = true; }));
++
++ return deferred_request_callback;
++}
++
+ } // namespace Http
+ } // namespace Envoy
+diff --git a/test/common/http/conn_manager_impl_test_base.h b/test/common/http/conn_manager_impl_test_base.h
+index 99fdc9344118..10a4ff15b54a 100644
+--- a/test/common/http/conn_manager_impl_test_base.h
++++ b/test/common/http/conn_manager_impl_test_base.h
+@@ -202,6 +202,8 @@ class HttpConnectionManagerImplMixin : public ConnectionManagerConfig {
+ HeaderValidator::TransformationResult transformation_result,
+ bool expect_response = true);
+
++ Event::MockSchedulableCallback* enableStreamsPerIoLimit(uint32_t limit);
++
+ Envoy::Event::SimulatedTimeSystem test_time_;
+ NiceMock<Router::MockRouteConfigProvider> route_config_provider_;
+ std::shared_ptr<Router::MockConfig> route_config_{new NiceMock<Router::MockConfig>()};
+diff --git a/test/common/http/http2/http2_frame.cc b/test/common/http/http2/http2_frame.cc
+index c5172938a804..46ba3751ba24 100644
+--- a/test/common/http/http2/http2_frame.cc
++++ b/test/common/http/http2/http2_frame.cc
+@@ -341,7 +341,11 @@ Http2Frame Http2Frame::makeRequest(uint32_t stream_index, absl::string_view host
+ makeNetworkOrderStreamId(stream_index));
+ frame.appendStaticHeader(StaticHeaderIndex::MethodGet);
+ frame.appendStaticHeader(StaticHeaderIndex::SchemeHttps);
+- frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path);
++ if (path.empty() || path == "/") {
++ frame.appendStaticHeader(StaticHeaderIndex::Path);
++ } else {
++ frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path);
++ }
+ frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Authority, host);
+ frame.adjustPayloadSize();
+ return frame;
+@@ -365,7 +369,11 @@ Http2Frame Http2Frame::makePostRequest(uint32_t stream_index, absl::string_view
+ makeNetworkOrderStreamId(stream_index));
+ frame.appendStaticHeader(StaticHeaderIndex::MethodPost);
+ frame.appendStaticHeader(StaticHeaderIndex::SchemeHttps);
+- frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path);
++ if (path.empty() || path == "/") {
++ frame.appendStaticHeader(StaticHeaderIndex::Path);
++ } else {
++ frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path);
++ }
+ frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Authority, host);
+ frame.adjustPayloadSize();
+ return frame;
+diff --git a/test/common/http/http2/http2_frame.h b/test/common/http/http2/http2_frame.h
+index 7fdf510ea256..5df3375c1661 100644
+--- a/test/common/http/http2/http2_frame.h
++++ b/test/common/http/http2/http2_frame.h
+@@ -253,6 +253,13 @@ class Http2Frame {
+ ConstIterator end() const { return data_.end(); }
+ bool empty() const { return data_.empty(); }
+
++ void appendHeaderWithoutIndexing(const Header& header);
++ // This method updates payload length in the HTTP2 header based on the size of the data_
++ void adjustPayloadSize() {
++ ASSERT(size() >= HeaderSize);
++ setPayloadSize(size() - HeaderSize);
++ }
++
+ private:
+ void buildHeader(Type type, uint32_t payload_size = 0, uint8_t flags = 0, uint32_t stream_id = 0);
+ void setPayloadSize(uint32_t size);
+@@ -272,15 +279,8 @@ class Http2Frame {
+ // Headers are directly encoded
+ void appendStaticHeader(StaticHeaderIndex index);
+ void appendHeaderWithoutIndexing(StaticHeaderIndex index, absl::string_view value);
+- void appendHeaderWithoutIndexing(const Header& header);
+ void appendEmptyHeader();
+
+- // This method updates payload length in the HTTP2 header based on the size of the data_
+- void adjustPayloadSize() {
+- ASSERT(size() >= HeaderSize);
+- setPayloadSize(size() - HeaderSize);
+- }
+-
+ DataContainer data_;
+ };
+
+diff --git a/test/integration/multiplexed_integration_test.cc b/test/integration/multiplexed_integration_test.cc
+index b4260ff3fa9e..edce9a1838d9 100644
+--- a/test/integration/multiplexed_integration_test.cc
++++ b/test/integration/multiplexed_integration_test.cc
+@@ -2130,6 +2130,176 @@ TEST_P(Http2FrameIntegrationTest, AccessLogOfWireBytesIfResponseSizeGreaterThanW
+ tcp_client_->close();
+ }
+
++TEST_P(Http2FrameIntegrationTest, MultipleHeaderOnlyRequests) {
++ const int kRequestsSentPerIOCycle = 20;
++ autonomous_upstream_ = true;
++ config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1");
++ beginSession();
++
++ std::string buffer;
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(i), "a", "/",
++ {{"response_data_blocks", "0"}, {"no_trailers", "1"}});
++ absl::StrAppend(&buffer, std::string(request));
++ }
++
++ ASSERT_TRUE(tcp_client_->write(buffer, false, false));
++
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto frame = readFrame();
++ EXPECT_EQ(Http2Frame::Type::Headers, frame.type());
++ EXPECT_EQ(Http2Frame::ResponseStatus::Ok, frame.responseStatus());
++ }
++ tcp_client_->close();
++}
++
++TEST_P(Http2FrameIntegrationTest, MultipleRequests) {
++ const int kRequestsSentPerIOCycle = 20;
++ autonomous_upstream_ = true;
++ config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1");
++ beginSession();
++
++ std::string buffer;
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto request =
++ Http2Frame::makePostRequest(Http2Frame::makeClientStreamId(i), "a", "/",
++ {{"response_data_blocks", "0"}, {"no_trailers", "1"}});
++ absl::StrAppend(&buffer, std::string(request));
++ }
++
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto data = Http2Frame::makeDataFrame(Http2Frame::makeClientStreamId(i), "a",
++ Http2Frame::DataFlags::EndStream);
++ absl::StrAppend(&buffer, std::string(data));
++ }
++
++ ASSERT_TRUE(tcp_client_->write(buffer, false, false));
++
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto frame = readFrame();
++ EXPECT_EQ(Http2Frame::Type::Headers, frame.type());
++ EXPECT_EQ(Http2Frame::ResponseStatus::Ok, frame.responseStatus());
++ }
++ tcp_client_->close();
++}
++
++TEST_P(Http2FrameIntegrationTest, MultipleRequestsWithTrailers) {
++ const int kRequestsSentPerIOCycle = 20;
++ autonomous_upstream_ = true;
++ config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1");
++ beginSession();
++
++ std::string buffer;
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto request =
++ Http2Frame::makePostRequest(Http2Frame::makeClientStreamId(i), "a", "/",
++ {{"response_data_blocks", "0"}, {"no_trailers", "1"}});
++ absl::StrAppend(&buffer, std::string(request));
++ }
++
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto data = Http2Frame::makeDataFrame(Http2Frame::makeClientStreamId(i), "a");
++ absl::StrAppend(&buffer, std::string(data));
++ }
++
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto trailers = Http2Frame::makeEmptyHeadersFrame(
++ Http2Frame::makeClientStreamId(i),
++ static_cast<Http2Frame::HeadersFlags>(Http::Http2::orFlags(
++ Http2Frame::HeadersFlags::EndStream, Http2Frame::HeadersFlags::EndHeaders)));
++ trailers.appendHeaderWithoutIndexing({"k", "v"});
++ trailers.adjustPayloadSize();
++ absl::StrAppend(&buffer, std::string(trailers));
++ }
++
++ ASSERT_TRUE(tcp_client_->write(buffer, false, false));
++
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto frame = readFrame();
++ EXPECT_EQ(Http2Frame::Type::Headers, frame.type());
++ EXPECT_EQ(Http2Frame::ResponseStatus::Ok, frame.responseStatus());
++ }
++ tcp_client_->close();
++}
++
++TEST_P(Http2FrameIntegrationTest, MultipleHeaderOnlyRequestsFollowedByReset) {
++ // This number of requests stays below premature reset detection.
++ const int kRequestsSentPerIOCycle = 20;
++ config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1");
++ beginSession();
++
++ std::string buffer;
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(i), "a", "/",
++ {{"response_data_blocks", "0"}, {"no_trailers", "1"}});
++ absl::StrAppend(&buffer, std::string(request));
++ }
++
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto reset = Http2Frame::makeResetStreamFrame(Http2Frame::makeClientStreamId(i),
++ Http2Frame::ErrorCode::Cancel);
++ absl::StrAppend(&buffer, std::string(reset));
++ }
++
++ ASSERT_TRUE(tcp_client_->write(buffer, false, false));
++ test_server_->waitForCounterEq("http.config_test.downstream_rq_rx_reset",
++ kRequestsSentPerIOCycle);
++ // Client should remain connected
++ ASSERT_TRUE(tcp_client_->connected());
++ tcp_client_->close();
++}
++
++TEST_P(Http2FrameIntegrationTest, ResettingDeferredRequestsTriggersPrematureResetCheck) {
++ const int kRequestsSentPerIOCycle = 20;
++ // Set premature stream count to the number of streams we are about to send
++ config_helper_.addRuntimeOverride("overload.premature_reset_total_stream_count", "20");
++ config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1");
++ beginSession();
++
++ std::string buffer;
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(i), "a", "/",
++ {{"response_data_blocks", "0"}, {"no_trailers", "1"}});
++ absl::StrAppend(&buffer, std::string(request));
++ }
++
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto reset = Http2Frame::makeResetStreamFrame(Http2Frame::makeClientStreamId(i),
++ Http2Frame::ErrorCode::Cancel);
++ absl::StrAppend(&buffer, std::string(reset));
++ }
++
++ ASSERT_TRUE(tcp_client_->write(buffer, false, false));
++ // Envoy should close the client connection due to too many premature resets
++ tcp_client_->waitForDisconnect();
++ test_server_->waitForCounterEq("http.config_test.downstream_rq_too_many_premature_resets", 1);
++}
++
++TEST_P(Http2FrameIntegrationTest, CloseConnectionWithDeferredStreams) {
++ // Use large number of requests to ensure close is detected while there are
++ // still some deferred streams.
++ const int kRequestsSentPerIOCycle = 1000;
++ config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1");
++ // Ensure premature reset detection does not get in the way
++ config_helper_.addRuntimeOverride("overload.premature_reset_total_stream_count", "1001");
++ beginSession();
++
++ std::string buffer;
++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) {
++ auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(i), "a", "/",
++ {{"response_data_blocks", "0"}, {"no_trailers", "1"}});
++ absl::StrAppend(&buffer, std::string(request));
++ }
++
++ ASSERT_TRUE(tcp_client_->write(buffer, false, false));
++ ASSERT_TRUE(tcp_client_->connected());
++ // Drop the downstream connection
++ tcp_client_->close();
++ // Test that Envoy can clean-up deferred streams
++ test_server_->waitForCounterEq("http.config_test.downstream_rq_rx_reset",
++ kRequestsSentPerIOCycle);
++}
++
+ INSTANTIATE_TEST_SUITE_P(IpVersions, Http2FrameIntegrationTest,
+ testing::ValuesIn(Http2FrameIntegrationTest::testParams()),
+ frameIntegrationTestParamToString);
+--- envoy-1.27.0.a/changelogs/current.yaml 2023-10-10 11:36:11.553031312 -0400
++++ envoy-1.27.0.b/changelogs/current.yaml 2023-10-10 11:37:56.601045565 -0400