diff options
author | Dan Fuhry | 2023-10-10 11:42:03 -0400 |
---|---|---|
committer | Dan Fuhry | 2023-10-10 11:42:03 -0400 |
commit | eff67d2594574e06a001f61a98e23fe9e3cad678 (patch) | |
tree | d98732bd622a39d72d7081fa03e737426c077270 | |
parent | c1693f10e3ec96dfd5283228d7b8f4991abbf53e (diff) | |
download | aur-eff67d2594574e06a001f61a98e23fe9e3cad678.tar.gz |
upgpkg: envoyproxy 1.27.0-2
Add patches for CVE-2023-44487, "HTTP/2 Rapid Reset DDoS attack"
See: https://blog.cloudflare.com/technical-breakdown-http2-rapid-reset-ddos-attack/
-rw-r--r-- | 0003-close-http-connections-that-prematurely-reset-stream.patch | 312 | ||||
-rw-r--r-- | 0004-limit-http-requests-per-io-cycle.patch | 821 |
2 files changed, 1133 insertions, 0 deletions
diff --git a/0003-close-http-connections-that-prematurely-reset-stream.patch b/0003-close-http-connections-that-prematurely-reset-stream.patch new file mode 100644 index 000000000000..936730eea7ec --- /dev/null +++ b/0003-close-http-connections-that-prematurely-reset-stream.patch @@ -0,0 +1,312 @@ +From 481a9d54b8f5d73e16c6673eaea1a781c5e5705e Mon Sep 17 00:00:00 2001 +From: Yan Avlasov <yavlasov@google.com> +Date: Thu, 28 Sep 2023 16:11:58 +0000 +Subject: [PATCH] Close HTTP connections that prematurely reset streams + +Signed-off-by: Yan Avlasov <yavlasov@google.com> +--- + changelogs/current.yaml | 9 +++ + source/common/http/conn_manager_config.h | 1 + + source/common/http/conn_manager_impl.cc | 65 +++++++++++++++++ + source/common/http/conn_manager_impl.h | 23 ++++++ + source/common/runtime/runtime_features.cc | 1 + + .../multiplexed_integration_test.cc | 72 +++++++++++++++++++ + 6 files changed, 171 insertions(+) + +diff --git a/changelogs/current.yaml b/changelogs/current.yaml +index 32fc3bc047fd2..b5d45089f337c 100644 +--- a/changelogs/current.yaml ++++ b/changelogs/current.yaml +@@ -1,6 +1,15 @@ + date: July 26, 2023 + + behavior_changes: ++- area: http ++ change: | ++ Close HTTP/2 and HTTP/3 connections that prematurely reset streams. The runtime key ++ ``overload.premature_reset_min_stream_lifetime_seconds`` determines the interval where received stream ++ reset is considered premature (with 1 second default). The runtime key ``overload.premature_reset_total_stream_count``, ++ with the default value of 500, determines the number of requests received from a connection before the check for premature ++ resets is applied. The connection is disconnected if more than 50% of resets are premature. ++ Setting the runtime key ``envoy.restart_features.send_goaway_for_premature_rst_streams`` to ``false`` completely disables ++ this check. + - area: build + change: | + Moved the subset, ring_hash, and maglev LB code into extensions. If you use these load balancers and override +diff --git a/source/common/http/conn_manager_config.h b/source/common/http/conn_manager_config.h +index 52f8188c205c5..a07eb825f789b 100644 +--- a/source/common/http/conn_manager_config.h ++++ b/source/common/http/conn_manager_config.h +@@ -66,6 +66,7 @@ namespace Http { + COUNTER(downstream_rq_rejected_via_ip_detection) \ + COUNTER(downstream_rq_response_before_rq_complete) \ + COUNTER(downstream_rq_rx_reset) \ ++ COUNTER(downstream_rq_too_many_premature_resets) \ + COUNTER(downstream_rq_timeout) \ + COUNTER(downstream_rq_header_timeout) \ + COUNTER(downstream_rq_too_large) \ +diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc +index 5a603aaed80b6..021c55bc4ccae 100644 +--- a/source/common/http/conn_manager_impl.cc ++++ b/source/common/http/conn_manager_impl.cc +@@ -1,5 +1,6 @@ + #include "source/common/http/conn_manager_impl.h" + ++#include <chrono> + #include <cstdint> + #include <functional> + #include <list> +@@ -55,6 +56,11 @@ + namespace Envoy { + namespace Http { + ++const absl::string_view ConnectionManagerImpl::PrematureResetTotalStreamCountKey = ++ "overload.premature_reset_total_stream_count"; ++const absl::string_view ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey = ++ "overload.premature_reset_min_stream_lifetime_seconds"; ++ + bool requestWasConnect(const RequestHeaderMapSharedPtr& headers, Protocol protocol) { + if (!headers) { + return false; +@@ -267,6 +273,12 @@ void ConnectionManagerImpl::doEndStream(ActiveStream& stream, bool check_for_def + } + + void ConnectionManagerImpl::doDeferredStreamDestroy(ActiveStream& stream) { ++ if (!stream.state_.is_internally_destroyed_) { ++ ++closed_non_internally_destroyed_requests_; ++ if (isPrematureRstStream(stream)) { ++ ++number_premature_stream_resets_; ++ } ++ } + if (stream.max_stream_duration_timer_ != nullptr) { + stream.max_stream_duration_timer_->disableTimer(); + stream.max_stream_duration_timer_ = nullptr; +@@ -343,6 +355,7 @@ void ConnectionManagerImpl::doDeferredStreamDestroy(ActiveStream& stream) { + if (connection_idle_timer_ && streams_.empty()) { + connection_idle_timer_->enableTimer(config_.idleTimeout().value()); + } ++ maybeDrainDueToPrematureResets(); + } + + RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encoder, +@@ -607,6 +620,58 @@ void ConnectionManagerImpl::doConnectionClose( + } + } + ++bool ConnectionManagerImpl::isPrematureRstStream(const ActiveStream& stream) const { ++ // Check if the request was prematurely reset, by comparing its lifetime to the configured ++ // threshold. ++ ASSERT(!stream.state_.is_internally_destroyed_); ++ absl::optional<std::chrono::nanoseconds> duration = ++ stream.filter_manager_.streamInfo().currentDuration(); ++ ++ // Check if request lifetime is longer than the premature reset threshold. ++ if (duration) { ++ const uint64_t lifetime = std::chrono::duration_cast<std::chrono::seconds>(*duration).count(); ++ const uint64_t min_lifetime = runtime_.snapshot().getInteger( ++ ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey, 1); ++ if (lifetime > min_lifetime) { ++ return false; ++ } ++ } ++ ++ // If request has completed before configured threshold, also check if the Envoy proxied the ++ // response from the upstream. Requests without the response status were reset. ++ // TODO(RyanTheOptimist): Possibly support half_closed_local instead. ++ return !stream.filter_manager_.streamInfo().responseCode(); ++} ++ ++// Sends a GOAWAY if too many streams have been reset prematurely on this ++// connection. ++void ConnectionManagerImpl::maybeDrainDueToPrematureResets() { ++ if (!Runtime::runtimeFeatureEnabled( ++ "envoy.restart_features.send_goaway_for_premature_rst_streams") || ++ closed_non_internally_destroyed_requests_ == 0) { ++ return; ++ } ++ ++ const uint64_t limit = ++ runtime_.snapshot().getInteger(ConnectionManagerImpl::PrematureResetTotalStreamCountKey, 500); ++ ++ if (closed_non_internally_destroyed_requests_ < limit) { ++ return; ++ } ++ ++ if (static_cast<double>(number_premature_stream_resets_) / ++ closed_non_internally_destroyed_requests_ < ++ .5) { ++ return; ++ } ++ ++ if (drain_state_ == DrainState::NotDraining) { ++ stats_.named_.downstream_rq_too_many_premature_resets_.inc(); ++ doConnectionClose(Network::ConnectionCloseType::Abort, absl::nullopt, ++ "too_many_premature_resets"); ++ } ++} ++ + void ConnectionManagerImpl::onGoAway(GoAwayErrorCode) { + // Currently we do nothing with remote go away frames. In the future we can decide to no longer + // push resources if applicable. +diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h +index b82b1967a5115..dad494c953bc0 100644 +--- a/source/common/http/conn_manager_impl.h ++++ b/source/common/http/conn_manager_impl.h +@@ -115,6 +115,14 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, + void setClearHopByHopResponseHeaders(bool value) { clear_hop_by_hop_response_headers_ = value; } + bool clearHopByHopResponseHeaders() const { return clear_hop_by_hop_response_headers_; } + ++ // This runtime key configures the number of streams which must be closed on a connection before ++ // envoy will potentially drain a connection due to excessive prematurely reset streams. ++ static const absl::string_view PrematureResetTotalStreamCountKey; ++ ++ // The minimum lifetime of a stream, in seconds, in order not to be considered ++ // prematurely closed. ++ static const absl::string_view PrematureResetMinStreamLifetimeSecondsKey; ++ + private: + struct ActiveStream; + class MobileConnectionManagerImpl; +@@ -544,6 +552,15 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, + void doConnectionClose(absl::optional<Network::ConnectionCloseType> close_type, + absl::optional<StreamInfo::ResponseFlag> response_flag, + absl::string_view details); ++ // Returns true if a RST_STREAM for the given stream is premature. Premature ++ // means the RST_STREAM arrived before response headers were sent and than ++ // the stream was alive for short period of time. This period is specified ++ // by the optional runtime value PrematureResetMinStreamLifetimeSecondsKey, ++ // or one second if that is not present. ++ bool isPrematureRstStream(const ActiveStream& stream) const; ++ // Sends a GOAWAY if both sufficient streams have been closed on a connection ++ // and at least half have been prematurely reset? ++ void maybeDrainDueToPrematureResets(); + + enum class DrainState { NotDraining, Draining, Closing }; + +@@ -584,6 +601,12 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, + bool clear_hop_by_hop_response_headers_{true}; + // The number of requests accumulated on the current connection. + uint64_t accumulated_requests_{}; ++ // The number of requests closed on the current connection which were ++ // not internally destroyed ++ uint64_t closed_non_internally_destroyed_requests_{}; ++ // The number of requests that received a premature RST_STREAM, according to ++ // the definition given in `isPrematureRstStream()`. ++ uint64_t number_premature_stream_resets_{0}; + const std::string proxy_name_; // for Proxy-Status. + + const bool refresh_rtt_after_request_{}; +diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc +index 8b75d6bbbfb1c..60d594c964319 100644 +--- a/source/common/runtime/runtime_features.cc ++++ b/source/common/runtime/runtime_features.cc +@@ -91,6 +91,7 @@ RUNTIME_GUARD(envoy_reloadable_features_validate_grpc_header_before_log_grpc_sta + RUNTIME_GUARD(envoy_reloadable_features_validate_upstream_headers); + RUNTIME_GUARD(envoy_restart_features_explicit_wildcard_resource); + RUNTIME_GUARD(envoy_restart_features_remove_runtime_singleton); ++RUNTIME_GUARD(envoy_restart_features_send_goaway_for_premature_rst_streams); + RUNTIME_GUARD(envoy_restart_features_udp_read_normalize_addresses); + RUNTIME_GUARD(envoy_restart_features_use_apple_api_for_dns_lookups); + +diff --git a/test/integration/multiplexed_integration_test.cc b/test/integration/multiplexed_integration_test.cc +index 190e57065d1c6..b4260ff3fa9ec 100644 +--- a/test/integration/multiplexed_integration_test.cc ++++ b/test/integration/multiplexed_integration_test.cc +@@ -1,4 +1,5 @@ + #include <algorithm> ++#include <chrono> + #include <memory> + #include <string> + +@@ -25,6 +26,7 @@ + #include "test/mocks/http/mocks.h" + #include "test/test_common/network_utility.h" + #include "test/test_common/printers.h" ++#include "test/test_common/simulated_time_system.h" + #include "test/test_common/utility.h" + + #include "gtest/gtest.h" +@@ -92,6 +94,15 @@ INSTANTIATE_TEST_SUITE_P(IpVersions, MultiplexedIntegrationTest, + {Http::CodecType::HTTP1})), + HttpProtocolIntegrationTest::protocolTestParamsToString); + ++class MultiplexedIntegrationTestWithSimulatedTime : public Event::TestUsingSimulatedTime, ++ public MultiplexedIntegrationTest {}; ++ ++INSTANTIATE_TEST_SUITE_P(IpVersions, MultiplexedIntegrationTestWithSimulatedTime, ++ testing::ValuesIn(HttpProtocolIntegrationTest::getProtocolTestParams( ++ {Http::CodecType::HTTP2, Http::CodecType::HTTP3}, ++ {Http::CodecType::HTTP1})), ++ HttpProtocolIntegrationTest::protocolTestParamsToString); ++ + TEST_P(MultiplexedIntegrationTest, RouterRequestAndResponseWithBodyNoBuffer) { + testRouterRequestAndResponseWithBody(1024, 512, false, false); + } +@@ -1076,6 +1087,67 @@ TEST_P(MultiplexedIntegrationTest, GoAway) { + EXPECT_EQ("200", response->headers().getStatusValue()); + } + ++// TODO(rch): Add a unit test which covers internal redirect handling. ++TEST_P(MultiplexedIntegrationTestWithSimulatedTime, GoAwayAfterTooManyResets) { ++ EXCLUDE_DOWNSTREAM_HTTP3; // Need to wait for the server to reset the stream ++ // before opening new one. ++ config_helper_.addRuntimeOverride("envoy.restart_features.send_goaway_for_premature_rst_streams", ++ "true"); ++ const int total_streams = 100; ++ config_helper_.addRuntimeOverride("overload.premature_reset_total_stream_count", ++ absl::StrCat(total_streams)); ++ initialize(); ++ ++ Http::TestRequestHeaderMapImpl headers{ ++ {":method", "GET"}, {":path", "/healthcheck"}, {":scheme", "http"}, {":authority", "host"}}; ++ codec_client_ = makeHttpConnection(lookupPort("http")); ++ for (int i = 0; i < total_streams; ++i) { ++ auto encoder_decoder = codec_client_->startRequest(headers); ++ request_encoder_ = &encoder_decoder.first; ++ auto response = std::move(encoder_decoder.second); ++ codec_client_->sendReset(*request_encoder_); ++ ASSERT_TRUE(response->waitForReset()); ++ } ++ ++ // Envoy should disconnect client due to premature reset check ++ ASSERT_TRUE(codec_client_->waitForDisconnect()); ++ test_server_->waitForCounterEq("http.config_test.downstream_rq_rx_reset", total_streams); ++ test_server_->waitForCounterEq("http.config_test.downstream_rq_too_many_premature_resets", 1); ++} ++ ++TEST_P(MultiplexedIntegrationTestWithSimulatedTime, DontGoAwayAfterTooManyResetsForLongStreams) { ++ EXCLUDE_DOWNSTREAM_HTTP3; // Need to wait for the server to reset the stream ++ // before opening new one. ++ config_helper_.addRuntimeOverride("envoy.restart_features.send_goaway_for_premature_rst_streams", ++ "true"); ++ const int total_streams = 100; ++ const int stream_lifetime_seconds = 2; ++ config_helper_.addRuntimeOverride("overload.premature_reset_total_stream_count", ++ absl::StrCat(total_streams)); ++ ++ config_helper_.addRuntimeOverride("overload.premature_reset_min_stream_lifetime_seconds", ++ absl::StrCat(stream_lifetime_seconds)); ++ ++ initialize(); ++ ++ Http::TestRequestHeaderMapImpl headers{ ++ {":method", "GET"}, {":path", "/healthcheck"}, {":scheme", "http"}, {":authority", "host"}}; ++ codec_client_ = makeHttpConnection(lookupPort("http")); ++ ++ std::string request_counter = "http.config_test.downstream_rq_total"; ++ std::string reset_counter = "http.config_test.downstream_rq_rx_reset"; ++ for (int i = 0; i < total_streams * 2; ++i) { ++ auto encoder_decoder = codec_client_->startRequest(headers); ++ request_encoder_ = &encoder_decoder.first; ++ auto response = std::move(encoder_decoder.second); ++ test_server_->waitForCounterEq(request_counter, i + 1); ++ timeSystem().advanceTimeWait(std::chrono::seconds(2 * stream_lifetime_seconds)); ++ codec_client_->sendReset(*request_encoder_); ++ ASSERT_TRUE(response->waitForReset()); ++ test_server_->waitForCounterEq(reset_counter, i + 1); ++ } ++} ++ + TEST_P(MultiplexedIntegrationTest, Trailers) { testTrailers(1024, 2048, false, false); } + + TEST_P(MultiplexedIntegrationTest, TrailersGiantBody) { diff --git a/0004-limit-http-requests-per-io-cycle.patch b/0004-limit-http-requests-per-io-cycle.patch new file mode 100644 index 000000000000..702e799513ec --- /dev/null +++ b/0004-limit-http-requests-per-io-cycle.patch @@ -0,0 +1,821 @@ +From 2e4228b0ee73ae640c92e0974c91e251997a3d2f Mon Sep 17 00:00:00 2001 +From: Yan Avlasov <yavlasov@google.com> +Date: Sat, 30 Sep 2023 13:58:46 +0000 +Subject: [PATCH] Limit on the number of HTTP requests processed from a + connection in an I/O cycle + +Signed-off-by: Yan Avlasov <yavlasov@google.com> +--- + changelogs/current.yaml | 7 + + source/common/http/conn_manager_impl.cc | 89 ++++++- + source/common/http/conn_manager_impl.h | 25 +- + test/common/http/conn_manager_impl_test_2.cc | 245 ++++++++++++++++++ + .../http/conn_manager_impl_test_base.cc | 19 ++ + .../common/http/conn_manager_impl_test_base.h | 2 + + test/common/http/http2/http2_frame.cc | 12 +- + test/common/http/http2/http2_frame.h | 14 +- + .../multiplexed_integration_test.cc | 170 ++++++++++++ + 9 files changed, 569 insertions(+), 14 deletions(-) + +diff --git a/changelogs/current.yaml b/changelogs/current.yaml +index b5d45089f337..86d0eac2339f 100644 +--- a/changelogs/current.yaml ++++ b/changelogs/current.yaml +@@ -10,6 +10,13 @@ + resets is applied. The connection is disconnected if more than 50% of resets are premature. + Setting the runtime key ``envoy.restart_features.send_goaway_for_premature_rst_streams`` to ``false`` completely disables + this check. ++- area: http ++ change: | ++ Add runtime flag ``http.max_requests_per_io_cycle`` for setting the limit on the number of HTTP requests processed ++ from a single connection in a single I/O cycle. Requests over this limit are processed in subsequent I/O cycles. This ++ mitigates CPU starvation by connections that simultaneously send high number of requests by allowing requests from other ++ connections to make progress. This runtime value can be set to 1 in the presence of abusive HTTP/2 or HTTP/3 connections. ++ By default this limit is disabled. + - area: build + change: | + Moved the subset, ring_hash, and maglev LB code into extensions. If you use these load balancers and override +diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc +index 021c55bc4cca..bf0439708413 100644 +--- a/source/common/http/conn_manager_impl.cc ++++ b/source/common/http/conn_manager_impl.cc +@@ -60,6 +60,10 @@ const absl::string_view ConnectionManagerImpl::PrematureResetTotalStreamCountKey + "overload.premature_reset_total_stream_count"; + const absl::string_view ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey = + "overload.premature_reset_min_stream_lifetime_seconds"; ++// Runtime key for maximum number of requests that can be processed from a single connection per ++// I/O cycle. Requests over this limit are deferred until the next I/O cycle. ++const absl::string_view ConnectionManagerImpl::MaxRequestsPerIoCycle = ++ "http.max_requests_per_io_cycle"; + + bool requestWasConnect(const RequestHeaderMapSharedPtr& headers, Protocol protocol) { + if (!headers) { +@@ -116,6 +120,8 @@ ConnectionManagerImpl::ConnectionManagerImpl(ConnectionManagerConfig& config, + /*node_id=*/local_info_.node().id(), + /*server_name=*/config_.serverName(), + /*proxy_status_config=*/config_.proxyStatusConfig())), ++ max_requests_during_dispatch_( ++ runtime_.snapshot().getInteger(ConnectionManagerImpl::MaxRequestsPerIoCycle, UINT32_MAX)), + refresh_rtt_after_request_( + Runtime::runtimeFeatureEnabled("envoy.reloadable_features.refresh_rtt_after_request")) {} + +@@ -128,6 +134,10 @@ const ResponseHeaderMap& ConnectionManagerImpl::continueHeader() { + void ConnectionManagerImpl::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) { + read_callbacks_ = &callbacks; + dispatcher_ = &callbacks.connection().dispatcher(); ++ if (max_requests_during_dispatch_ != UINT32_MAX) { ++ deferred_request_processing_callback_ = ++ dispatcher_->createSchedulableCallback([this]() -> void { onDeferredRequestProcessing(); }); ++ } + + stats_.named_.downstream_cx_total_.inc(); + stats_.named_.downstream_cx_active_.inc(); +@@ -454,6 +464,7 @@ void ConnectionManagerImpl::createCodec(Buffer::Instance& data) { + } + + Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) { ++ requests_during_dispatch_count_ = 0; + if (!codec_) { + // Http3 codec should have been instantiated by now. + createCodec(data); +@@ -1366,7 +1377,12 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPt + traceRequest(); + } + +- filter_manager_.decodeHeaders(*request_headers_, end_stream); ++ if (!connection_manager_.shouldDeferRequestProxyingToNextIoCycle()) { ++ filter_manager_.decodeHeaders(*request_headers_, end_stream); ++ } else { ++ state_.deferred_to_next_io_iteration_ = true; ++ state_.deferred_end_stream_ = end_stream; ++ } + + // Reset it here for both global and overridden cases. + resetIdleTimer(); +@@ -1433,8 +1449,15 @@ void ConnectionManagerImpl::ActiveStream::decodeData(Buffer::Instance& data, boo + connection_manager_.read_callbacks_->connection().dispatcher()); + maybeEndDecode(end_stream); + filter_manager_.streamInfo().addBytesReceived(data.length()); +- +- filter_manager_.decodeData(data, end_stream); ++ if (!state_.deferred_to_next_io_iteration_) { ++ filter_manager_.decodeData(data, end_stream); ++ } else { ++ if (!deferred_data_) { ++ deferred_data_ = std::make_unique<Buffer::OwnedImpl>(); ++ } ++ deferred_data_->move(data); ++ state_.deferred_end_stream_ = end_stream; ++ } + } + + void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&& trailers) { +@@ -1450,7 +1473,9 @@ void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&& + return; + } + maybeEndDecode(true); +- filter_manager_.decodeTrailers(*request_trailers_); ++ if (!state_.deferred_to_next_io_iteration_) { ++ filter_manager_.decodeTrailers(*request_trailers_); ++ } + } + + void ConnectionManagerImpl::ActiveStream::decodeMetadata(MetadataMapPtr&& metadata_map) { +@@ -2158,5 +2183,61 @@ void ConnectionManagerImpl::ActiveStream::resetStream(Http::StreamResetReason, a + connection_manager_.doEndStream(*this); + } + ++bool ConnectionManagerImpl::ActiveStream::onDeferredRequestProcessing() { ++ // TODO(yanavlasov): Merge this with the filter manager continueIteration() method ++ if (!state_.deferred_to_next_io_iteration_) { ++ return false; ++ } ++ state_.deferred_to_next_io_iteration_ = false; ++ bool end_stream = ++ state_.deferred_end_stream_ && deferred_data_ == nullptr && request_trailers_ == nullptr; ++ filter_manager_.decodeHeaders(*request_headers_, end_stream); ++ if (end_stream) { ++ return true; ++ } ++ if (deferred_data_ != nullptr) { ++ end_stream = state_.deferred_end_stream_ && request_trailers_ == nullptr; ++ filter_manager_.decodeData(*deferred_data_, end_stream); ++ } ++ if (request_trailers_ != nullptr) { ++ filter_manager_.decodeTrailers(*request_trailers_); ++ } ++ return true; ++} ++ ++bool ConnectionManagerImpl::shouldDeferRequestProxyingToNextIoCycle() { ++ // Do not defer this stream if stream deferral is disabled ++ if (deferred_request_processing_callback_ == nullptr) { ++ return false; ++ } ++ // Defer this stream if there are already deferred streams, so they are not ++ // processed out of order ++ if (deferred_request_processing_callback_->enabled()) { ++ return true; ++ } ++ ++requests_during_dispatch_count_; ++ bool defer = requests_during_dispatch_count_ > max_requests_during_dispatch_; ++ if (defer) { ++ deferred_request_processing_callback_->scheduleCallbackNextIteration(); ++ } ++ return defer; ++} ++ ++void ConnectionManagerImpl::onDeferredRequestProcessing() { ++ requests_during_dispatch_count_ = 1; // 1 stream is always let through ++ // Streams are inserted at the head of the list. As such process deferred ++ // streams at the back of the list first. ++ for (auto reverse_iter = streams_.rbegin(); reverse_iter != streams_.rend();) { ++ auto& stream_ptr = *reverse_iter; ++ // Move the iterator to the next item in case the `onDeferredRequestProcessing` call removes the ++ // stream from the list. ++ ++reverse_iter; ++ bool was_deferred = stream_ptr->onDeferredRequestProcessing(); ++ if (was_deferred && shouldDeferRequestProxyingToNextIoCycle()) { ++ break; ++ } ++ } ++} ++ + } // namespace Http + } // namespace Envoy +diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h +index dad494c953bc..e79a6a81c082 100644 +--- a/source/common/http/conn_manager_impl.h ++++ b/source/common/http/conn_manager_impl.h +@@ -122,6 +122,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, + // The minimum lifetime of a stream, in seconds, in order not to be considered + // prematurely closed. + static const absl::string_view PrematureResetMinStreamLifetimeSecondsKey; ++ static const absl::string_view MaxRequestsPerIoCycle; + + private: + struct ActiveStream; +@@ -345,7 +346,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, + : codec_saw_local_complete_(false), codec_encode_complete_(false), + on_reset_stream_called_(false), is_zombie_stream_(false), saw_connection_close_(false), + successful_upgrade_(false), is_internally_destroyed_(false), +- is_internally_created_(false), is_tunneling_(false), decorated_propagate_(true) {} ++ is_internally_created_(false), is_tunneling_(false), decorated_propagate_(true), ++ deferred_to_next_io_iteration_(false) {} + + // It's possibly for the codec to see the completed response but not fully + // encode it. +@@ -371,6 +373,14 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, + bool is_tunneling_ : 1; + + bool decorated_propagate_ : 1; ++ ++ // Indicates that sending headers to the filter manager is deferred to the ++ // next I/O cycle. If data or trailers are received when this flag is set ++ // they are deferred too. ++ // TODO(yanavlasov): encapsulate the entire state of deferred streams into a separate ++ // structure, so it can be atomically created and cleared. ++ bool deferred_to_next_io_iteration_ : 1; ++ bool deferred_end_stream_ : 1; + }; + + bool canDestroyStream() const { +@@ -418,6 +428,11 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, + // HTTP connection manager configuration, then the entire connection is closed. + bool validateTrailers(); + ++ // Dispatch deferred headers, body and trailers to the filter manager. ++ // Return true if this stream was deferred and dispatched pending headers, body and trailers (if ++ // present). Return false if this stream was not deferred. ++ bool onDeferredRequestProcessing(); ++ + ConnectionManagerImpl& connection_manager_; + OptRef<const TracingConnectionManagerConfig> connection_manager_tracing_config_; + // TODO(snowp): It might make sense to move this to the FilterManager to avoid storing it in +@@ -504,6 +519,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, + const Tracing::CustomTagMap* customTags() const override; + bool verbose() const override; + uint32_t maxPathTagLength() const override; ++ ++ std::unique_ptr<Buffer::OwnedImpl> deferred_data_; + }; + + using ActiveStreamPtr = std::unique_ptr<ActiveStream>; +@@ -562,6 +579,9 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, + // and at least half have been prematurely reset? + void maybeDrainDueToPrematureResets(); + ++ bool shouldDeferRequestProxyingToNextIoCycle(); ++ void onDeferredRequestProcessing(); ++ + enum class DrainState { NotDraining, Draining, Closing }; + + ConnectionManagerConfig& config_; +@@ -608,6 +628,9 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, + // the definition given in `isPrematureRstStream()`. + uint64_t number_premature_stream_resets_{0}; + const std::string proxy_name_; // for Proxy-Status. ++ uint32_t requests_during_dispatch_count_{0}; ++ const uint32_t max_requests_during_dispatch_{UINT32_MAX}; ++ Event::SchedulableCallbackPtr deferred_request_processing_callback_; + + const bool refresh_rtt_after_request_{}; + }; +diff --git a/test/common/http/conn_manager_impl_test_2.cc b/test/common/http/conn_manager_impl_test_2.cc +index 5daf2b1a45e6..078bd1d85ab7 100644 +--- a/test/common/http/conn_manager_impl_test_2.cc ++++ b/test/common/http/conn_manager_impl_test_2.cc +@@ -11,6 +11,7 @@ using testing::InvokeWithoutArgs; + using testing::Mock; + using testing::Ref; + using testing::Return; ++using testing::ReturnArg; + using testing::ReturnRef; + + namespace Envoy { +@@ -3767,5 +3768,249 @@ TEST_F(HttpConnectionManagerImplTest, NoProxyProtocolAdded) { + // Clean up. + filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose); + } ++ ++// Validate that deferred streams are processed with a variety of ++// headers, data and trailer arriving in the same I/O cycle ++TEST_F(HttpConnectionManagerImplTest, LimitWorkPerIOCycle) { ++ const int kRequestsSentPerIOCycle = 100; ++ EXPECT_CALL(runtime_.snapshot_, getInteger(_, _)).WillRepeatedly(ReturnArg<1>()); ++ // Process 1 request per I/O cycle ++ auto* deferred_request_callback = enableStreamsPerIoLimit(1); ++ setup(false, ""); ++ ++ // Store the basic request encoder during filter chain setup. ++ std::vector<std::shared_ptr<MockStreamDecoderFilter>> encoder_filters; ++ int decode_headers_call_count = 0; ++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { ++ std::shared_ptr<MockStreamDecoderFilter> filter(new NiceMock<MockStreamDecoderFilter>()); ++ ++ // Each 4th request is headers only ++ EXPECT_CALL(*filter, decodeHeaders(_, i % 4 == 0 ? true : false)) ++ .WillRepeatedly(Invoke([&](RequestHeaderMap&, bool) -> FilterHeadersStatus { ++ ++decode_headers_call_count; ++ return FilterHeadersStatus::StopIteration; ++ })); ++ ++ // Each 1st request is headers and data only ++ // Each 2nd request is headers, data and trailers ++ if (i % 4 == 1 || i % 4 == 2) { ++ EXPECT_CALL(*filter, decodeData(_, i % 4 == 1 ? true : false)) ++ .WillOnce(Return(FilterDataStatus::StopIterationNoBuffer)); ++ } ++ ++ // Each 3rd request is headers and trailers (no data) ++ if (i % 4 == 2 || i % 4 == 3) { ++ EXPECT_CALL(*filter, decodeTrailers(_)).WillOnce(Return(FilterTrailersStatus::StopIteration)); ++ } ++ ++ EXPECT_CALL(*filter, setDecoderFilterCallbacks(_)); ++ encoder_filters.push_back(std::move(filter)); ++ } ++ ++ uint64_t random_value = 0; ++ EXPECT_CALL(random_, random()).WillRepeatedly(Invoke([&random_value]() { ++ return random_value++; ++ })); ++ ++ EXPECT_CALL(filter_factory_, createFilterChain(_)) ++ .Times(kRequestsSentPerIOCycle) ++ .WillRepeatedly(Invoke([&encoder_filters](FilterChainManager& manager) -> bool { ++ static int index = 0; ++ int i = index++; ++ FilterFactoryCb factory([&encoder_filters, i](FilterChainFactoryCallbacks& callbacks) { ++ callbacks.addStreamDecoderFilter(encoder_filters[i]); ++ }); ++ manager.applyFilterFactoryCb({}, factory); ++ return true; ++ })); ++ ++ EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, deferredDelete_(_)) ++ .Times(kRequestsSentPerIOCycle); ++ ++ std::vector<NiceMock<MockResponseEncoder>> response_encoders(kRequestsSentPerIOCycle); ++ for (auto& encoder : response_encoders) { ++ EXPECT_CALL(encoder, getStream()).WillRepeatedly(ReturnRef(encoder.stream_)); ++ } ++ ++ EXPECT_CALL(*codec_, dispatch(_)) ++ .WillRepeatedly(Invoke([&](Buffer::Instance& data) -> Http::Status { ++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { ++ decoder_ = &conn_manager_->newStream(response_encoders[i]); ++ ++ RequestHeaderMapPtr headers{new TestRequestHeaderMapImpl{ ++ {":authority", "host"}, {":path", "/"}, {":method", "GET"}}}; ++ ++ RequestTrailerMapPtr trailers{ ++ new TestRequestTrailerMapImpl{{"key1", "value1"}, {"key2", "value2"}}}; ++ ++ Buffer::OwnedImpl data("data"); ++ ++ switch (i % 4) { ++ case 0: ++ decoder_->decodeHeaders(std::move(headers), true); ++ break; ++ case 1: ++ decoder_->decodeHeaders(std::move(headers), false); ++ decoder_->decodeData(data, true); ++ break; ++ case 2: ++ decoder_->decodeHeaders(std::move(headers), false); ++ decoder_->decodeData(data, false); ++ decoder_->decodeTrailers(std::move(trailers)); ++ break; ++ case 3: ++ decoder_->decodeHeaders(std::move(headers), false); ++ decoder_->decodeTrailers(std::move(trailers)); ++ break; ++ } ++ } ++ ++ data.drain(4); ++ return Http::okStatus(); ++ })); ++ ++ // Kick off the incoming data. ++ Buffer::OwnedImpl fake_input("1234"); ++ conn_manager_->onData(fake_input, false); ++ ++ EXPECT_TRUE(deferred_request_callback->enabled_); ++ // Only one request should go through the filter chain ++ ASSERT_EQ(decode_headers_call_count, 1); ++ ++ // Let other requests to go through the filter chain. Call expectations will fail ++ // if this is not the case. ++ int deferred_request_count = 0; ++ while (deferred_request_callback->enabled_) { ++ deferred_request_callback->invokeCallback(); ++ ++deferred_request_count; ++ } ++ ++ ASSERT_EQ(deferred_request_count, kRequestsSentPerIOCycle); ++ ++ for (auto& filter : encoder_filters) { ++ ResponseHeaderMapPtr response_headers{new TestResponseHeaderMapImpl{{":status", "200"}}}; ++ filter->callbacks_->streamInfo().setResponseCodeDetails(""); ++ filter->callbacks_->encodeHeaders(std::move(response_headers), true, "details"); ++ } ++ ++ EXPECT_EQ(kRequestsSentPerIOCycle, stats_.named_.downstream_rq_2xx_.value()); ++ EXPECT_EQ(kRequestsSentPerIOCycle, listener_stats_.downstream_rq_2xx_.value()); ++ EXPECT_EQ(kRequestsSentPerIOCycle, stats_.named_.downstream_rq_completed_.value()); ++ EXPECT_EQ(kRequestsSentPerIOCycle, listener_stats_.downstream_rq_completed_.value()); ++} ++ ++TEST_F(HttpConnectionManagerImplTest, StreamDeferralPreservesOrder) { ++ EXPECT_CALL(runtime_.snapshot_, getInteger(_, _)).WillRepeatedly(ReturnArg<1>()); ++ // Process 1 request per I/O cycle ++ auto* deferred_request_callback = enableStreamsPerIoLimit(1); ++ setup(false, ""); ++ ++ std::vector<std::shared_ptr<MockStreamDecoderFilter>> encoder_filters; ++ int expected_request_id = 0; ++ const Http::LowerCaseString request_id_header(absl::string_view("request-id")); ++ // Two requests are processed in 2 I/O reads ++ const int TotalRequests = 2 * 2; ++ for (int i = 0; i < TotalRequests; ++i) { ++ std::shared_ptr<MockStreamDecoderFilter> filter(new NiceMock<MockStreamDecoderFilter>()); ++ ++ EXPECT_CALL(*filter, decodeHeaders(_, true)) ++ .WillRepeatedly(Invoke([&](RequestHeaderMap& headers, bool) -> FilterHeadersStatus { ++ // Check that requests are decoded in expected order ++ int request_id = 0; ++ ASSERT(absl::SimpleAtoi(headers.get(request_id_header)[0]->value().getStringView(), ++ &request_id)); ++ ASSERT(request_id == expected_request_id); ++ ++expected_request_id; ++ return FilterHeadersStatus::StopIteration; ++ })); ++ ++ EXPECT_CALL(*filter, setDecoderFilterCallbacks(_)); ++ encoder_filters.push_back(std::move(filter)); ++ } ++ ++ uint64_t random_value = 0; ++ EXPECT_CALL(random_, random()).WillRepeatedly(Invoke([&random_value]() { ++ return random_value++; ++ })); ++ ++ EXPECT_CALL(filter_factory_, createFilterChain(_)) ++ .Times(TotalRequests) ++ .WillRepeatedly(Invoke([&encoder_filters](FilterChainManager& manager) -> bool { ++ static int index = 0; ++ int i = index++; ++ FilterFactoryCb factory([&encoder_filters, i](FilterChainFactoryCallbacks& callbacks) { ++ callbacks.addStreamDecoderFilter(encoder_filters[i]); ++ }); ++ manager.applyFilterFactoryCb({}, factory); ++ return true; ++ })); ++ ++ EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, deferredDelete_(_)).Times(TotalRequests); ++ ++ std::vector<NiceMock<MockResponseEncoder>> response_encoders(TotalRequests); ++ for (auto& encoder : response_encoders) { ++ EXPECT_CALL(encoder, getStream()).WillRepeatedly(ReturnRef(encoder.stream_)); ++ } ++ auto response_encoders_iter = response_encoders.begin(); ++ ++ int request_id = 0; ++ EXPECT_CALL(*codec_, dispatch(_)) ++ .WillRepeatedly(Invoke([&](Buffer::Instance& data) -> Http::Status { ++ // The second request should be deferred ++ for (int i = 0; i < 2; ++i) { ++ decoder_ = &conn_manager_->newStream(*response_encoders_iter); ++ ++response_encoders_iter; ++ ++ RequestHeaderMapPtr headers{ ++ new TestRequestHeaderMapImpl{{":authority", "host"}, ++ {":path", "/"}, ++ {":method", "GET"}, ++ {"request-id", absl::StrCat(request_id)}}}; ++ ++ ++request_id; ++ decoder_->decodeHeaders(std::move(headers), true); ++ } ++ ++ data.drain(4); ++ return Http::okStatus(); ++ })); ++ ++ // Kick off the incoming data. ++ Buffer::OwnedImpl fake_input("1234"); ++ conn_manager_->onData(fake_input, false); ++ ++ EXPECT_TRUE(deferred_request_callback->enabled_); ++ // Only one request should go through the filter chain ++ ASSERT_EQ(expected_request_id, 1); ++ ++ // Test arrival of another request. New request is read from the socket before deferred callbacks. ++ Buffer::OwnedImpl fake_input2("1234"); ++ conn_manager_->onData(fake_input2, false); ++ ++ // No requests from the second read should go through as there are deferred stream present ++ ASSERT_EQ(expected_request_id, 1); ++ ++ // Let other requests to go through the filter chain. Call expectations will fail ++ // if this is not the case. ++ int deferred_request_count = 0; ++ while (deferred_request_callback->enabled_) { ++ deferred_request_callback->invokeCallback(); ++ ++deferred_request_count; ++ } ++ ++ ASSERT_EQ(deferred_request_count, TotalRequests); ++ ++ for (auto& filter : encoder_filters) { ++ ResponseHeaderMapPtr response_headers{new TestResponseHeaderMapImpl{{":status", "200"}}}; ++ filter->callbacks_->streamInfo().setResponseCodeDetails(""); ++ filter->callbacks_->encodeHeaders(std::move(response_headers), true, "details"); ++ } ++ ++ EXPECT_EQ(TotalRequests, stats_.named_.downstream_rq_2xx_.value()); ++ EXPECT_EQ(TotalRequests, listener_stats_.downstream_rq_2xx_.value()); ++ EXPECT_EQ(TotalRequests, stats_.named_.downstream_rq_completed_.value()); ++ EXPECT_EQ(TotalRequests, listener_stats_.downstream_rq_completed_.value()); ++} ++ + } // namespace Http + } // namespace Envoy +diff --git a/test/common/http/conn_manager_impl_test_base.cc b/test/common/http/conn_manager_impl_test_base.cc +index 1c5053246e44..641ca6fae082 100644 +--- a/test/common/http/conn_manager_impl_test_base.cc ++++ b/test/common/http/conn_manager_impl_test_base.cc +@@ -78,6 +78,7 @@ void HttpConnectionManagerImplMixin::setup(bool ssl, const std::string& server_n + conn_manager_ = std::make_unique<ConnectionManagerImpl>( + *this, drain_close_, random_, http_context_, runtime_, local_info_, cluster_manager_, + overload_manager_, test_time_.timeSystem()); ++ + conn_manager_->initializeReadFilterCallbacks(filter_callbacks_); + + if (tracing) { +@@ -370,5 +371,23 @@ void HttpConnectionManagerImplMixin::expectUhvTrailerCheck( + })); + } + ++Event::MockSchedulableCallback* ++HttpConnectionManagerImplMixin::enableStreamsPerIoLimit(uint32_t limit) { ++ EXPECT_CALL(runtime_.snapshot_, getInteger("http.max_requests_per_io_cycle", _)) ++ .WillOnce(Return(limit)); ++ ++ // Expect HCM to create and set schedulable callback ++ auto* deferred_request_callback = ++ new Event::MockSchedulableCallback(&filter_callbacks_.connection_.dispatcher_); ++ EXPECT_CALL(*deferred_request_callback, enabled()) ++ .WillRepeatedly( ++ Invoke([deferred_request_callback]() { return deferred_request_callback->enabled_; })); ++ EXPECT_CALL(*deferred_request_callback, scheduleCallbackNextIteration()) ++ .WillRepeatedly( ++ Invoke([deferred_request_callback]() { deferred_request_callback->enabled_ = true; })); ++ ++ return deferred_request_callback; ++} ++ + } // namespace Http + } // namespace Envoy +diff --git a/test/common/http/conn_manager_impl_test_base.h b/test/common/http/conn_manager_impl_test_base.h +index 99fdc9344118..10a4ff15b54a 100644 +--- a/test/common/http/conn_manager_impl_test_base.h ++++ b/test/common/http/conn_manager_impl_test_base.h +@@ -202,6 +202,8 @@ class HttpConnectionManagerImplMixin : public ConnectionManagerConfig { + HeaderValidator::TransformationResult transformation_result, + bool expect_response = true); + ++ Event::MockSchedulableCallback* enableStreamsPerIoLimit(uint32_t limit); ++ + Envoy::Event::SimulatedTimeSystem test_time_; + NiceMock<Router::MockRouteConfigProvider> route_config_provider_; + std::shared_ptr<Router::MockConfig> route_config_{new NiceMock<Router::MockConfig>()}; +diff --git a/test/common/http/http2/http2_frame.cc b/test/common/http/http2/http2_frame.cc +index c5172938a804..46ba3751ba24 100644 +--- a/test/common/http/http2/http2_frame.cc ++++ b/test/common/http/http2/http2_frame.cc +@@ -341,7 +341,11 @@ Http2Frame Http2Frame::makeRequest(uint32_t stream_index, absl::string_view host + makeNetworkOrderStreamId(stream_index)); + frame.appendStaticHeader(StaticHeaderIndex::MethodGet); + frame.appendStaticHeader(StaticHeaderIndex::SchemeHttps); +- frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path); ++ if (path.empty() || path == "/") { ++ frame.appendStaticHeader(StaticHeaderIndex::Path); ++ } else { ++ frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path); ++ } + frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Authority, host); + frame.adjustPayloadSize(); + return frame; +@@ -365,7 +369,11 @@ Http2Frame Http2Frame::makePostRequest(uint32_t stream_index, absl::string_view + makeNetworkOrderStreamId(stream_index)); + frame.appendStaticHeader(StaticHeaderIndex::MethodPost); + frame.appendStaticHeader(StaticHeaderIndex::SchemeHttps); +- frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path); ++ if (path.empty() || path == "/") { ++ frame.appendStaticHeader(StaticHeaderIndex::Path); ++ } else { ++ frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Path, path); ++ } + frame.appendHeaderWithoutIndexing(StaticHeaderIndex::Authority, host); + frame.adjustPayloadSize(); + return frame; +diff --git a/test/common/http/http2/http2_frame.h b/test/common/http/http2/http2_frame.h +index 7fdf510ea256..5df3375c1661 100644 +--- a/test/common/http/http2/http2_frame.h ++++ b/test/common/http/http2/http2_frame.h +@@ -253,6 +253,13 @@ class Http2Frame { + ConstIterator end() const { return data_.end(); } + bool empty() const { return data_.empty(); } + ++ void appendHeaderWithoutIndexing(const Header& header); ++ // This method updates payload length in the HTTP2 header based on the size of the data_ ++ void adjustPayloadSize() { ++ ASSERT(size() >= HeaderSize); ++ setPayloadSize(size() - HeaderSize); ++ } ++ + private: + void buildHeader(Type type, uint32_t payload_size = 0, uint8_t flags = 0, uint32_t stream_id = 0); + void setPayloadSize(uint32_t size); +@@ -272,15 +279,8 @@ class Http2Frame { + // Headers are directly encoded + void appendStaticHeader(StaticHeaderIndex index); + void appendHeaderWithoutIndexing(StaticHeaderIndex index, absl::string_view value); +- void appendHeaderWithoutIndexing(const Header& header); + void appendEmptyHeader(); + +- // This method updates payload length in the HTTP2 header based on the size of the data_ +- void adjustPayloadSize() { +- ASSERT(size() >= HeaderSize); +- setPayloadSize(size() - HeaderSize); +- } +- + DataContainer data_; + }; + +diff --git a/test/integration/multiplexed_integration_test.cc b/test/integration/multiplexed_integration_test.cc +index b4260ff3fa9e..edce9a1838d9 100644 +--- a/test/integration/multiplexed_integration_test.cc ++++ b/test/integration/multiplexed_integration_test.cc +@@ -2130,6 +2130,176 @@ TEST_P(Http2FrameIntegrationTest, AccessLogOfWireBytesIfResponseSizeGreaterThanW + tcp_client_->close(); + } + ++TEST_P(Http2FrameIntegrationTest, MultipleHeaderOnlyRequests) { ++ const int kRequestsSentPerIOCycle = 20; ++ autonomous_upstream_ = true; ++ config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1"); ++ beginSession(); ++ ++ std::string buffer; ++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { ++ auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(i), "a", "/", ++ {{"response_data_blocks", "0"}, {"no_trailers", "1"}}); ++ absl::StrAppend(&buffer, std::string(request)); ++ } ++ ++ ASSERT_TRUE(tcp_client_->write(buffer, false, false)); ++ ++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { ++ auto frame = readFrame(); ++ EXPECT_EQ(Http2Frame::Type::Headers, frame.type()); ++ EXPECT_EQ(Http2Frame::ResponseStatus::Ok, frame.responseStatus()); ++ } ++ tcp_client_->close(); ++} ++ ++TEST_P(Http2FrameIntegrationTest, MultipleRequests) { ++ const int kRequestsSentPerIOCycle = 20; ++ autonomous_upstream_ = true; ++ config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1"); ++ beginSession(); ++ ++ std::string buffer; ++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { ++ auto request = ++ Http2Frame::makePostRequest(Http2Frame::makeClientStreamId(i), "a", "/", ++ {{"response_data_blocks", "0"}, {"no_trailers", "1"}}); ++ absl::StrAppend(&buffer, std::string(request)); ++ } ++ ++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { ++ auto data = Http2Frame::makeDataFrame(Http2Frame::makeClientStreamId(i), "a", ++ Http2Frame::DataFlags::EndStream); ++ absl::StrAppend(&buffer, std::string(data)); ++ } ++ ++ ASSERT_TRUE(tcp_client_->write(buffer, false, false)); ++ ++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { ++ auto frame = readFrame(); ++ EXPECT_EQ(Http2Frame::Type::Headers, frame.type()); ++ EXPECT_EQ(Http2Frame::ResponseStatus::Ok, frame.responseStatus()); ++ } ++ tcp_client_->close(); ++} ++ ++TEST_P(Http2FrameIntegrationTest, MultipleRequestsWithTrailers) { ++ const int kRequestsSentPerIOCycle = 20; ++ autonomous_upstream_ = true; ++ config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1"); ++ beginSession(); ++ ++ std::string buffer; ++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { ++ auto request = ++ Http2Frame::makePostRequest(Http2Frame::makeClientStreamId(i), "a", "/", ++ {{"response_data_blocks", "0"}, {"no_trailers", "1"}}); ++ absl::StrAppend(&buffer, std::string(request)); ++ } ++ ++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { ++ auto data = Http2Frame::makeDataFrame(Http2Frame::makeClientStreamId(i), "a"); ++ absl::StrAppend(&buffer, std::string(data)); ++ } ++ ++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { ++ auto trailers = Http2Frame::makeEmptyHeadersFrame( ++ Http2Frame::makeClientStreamId(i), ++ static_cast<Http2Frame::HeadersFlags>(Http::Http2::orFlags( ++ Http2Frame::HeadersFlags::EndStream, Http2Frame::HeadersFlags::EndHeaders))); ++ trailers.appendHeaderWithoutIndexing({"k", "v"}); ++ trailers.adjustPayloadSize(); ++ absl::StrAppend(&buffer, std::string(trailers)); ++ } ++ ++ ASSERT_TRUE(tcp_client_->write(buffer, false, false)); ++ ++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { ++ auto frame = readFrame(); ++ EXPECT_EQ(Http2Frame::Type::Headers, frame.type()); ++ EXPECT_EQ(Http2Frame::ResponseStatus::Ok, frame.responseStatus()); ++ } ++ tcp_client_->close(); ++} ++ ++TEST_P(Http2FrameIntegrationTest, MultipleHeaderOnlyRequestsFollowedByReset) { ++ // This number of requests stays below premature reset detection. ++ const int kRequestsSentPerIOCycle = 20; ++ config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1"); ++ beginSession(); ++ ++ std::string buffer; ++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { ++ auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(i), "a", "/", ++ {{"response_data_blocks", "0"}, {"no_trailers", "1"}}); ++ absl::StrAppend(&buffer, std::string(request)); ++ } ++ ++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { ++ auto reset = Http2Frame::makeResetStreamFrame(Http2Frame::makeClientStreamId(i), ++ Http2Frame::ErrorCode::Cancel); ++ absl::StrAppend(&buffer, std::string(reset)); ++ } ++ ++ ASSERT_TRUE(tcp_client_->write(buffer, false, false)); ++ test_server_->waitForCounterEq("http.config_test.downstream_rq_rx_reset", ++ kRequestsSentPerIOCycle); ++ // Client should remain connected ++ ASSERT_TRUE(tcp_client_->connected()); ++ tcp_client_->close(); ++} ++ ++TEST_P(Http2FrameIntegrationTest, ResettingDeferredRequestsTriggersPrematureResetCheck) { ++ const int kRequestsSentPerIOCycle = 20; ++ // Set premature stream count to the number of streams we are about to send ++ config_helper_.addRuntimeOverride("overload.premature_reset_total_stream_count", "20"); ++ config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1"); ++ beginSession(); ++ ++ std::string buffer; ++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { ++ auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(i), "a", "/", ++ {{"response_data_blocks", "0"}, {"no_trailers", "1"}}); ++ absl::StrAppend(&buffer, std::string(request)); ++ } ++ ++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { ++ auto reset = Http2Frame::makeResetStreamFrame(Http2Frame::makeClientStreamId(i), ++ Http2Frame::ErrorCode::Cancel); ++ absl::StrAppend(&buffer, std::string(reset)); ++ } ++ ++ ASSERT_TRUE(tcp_client_->write(buffer, false, false)); ++ // Envoy should close the client connection due to too many premature resets ++ tcp_client_->waitForDisconnect(); ++ test_server_->waitForCounterEq("http.config_test.downstream_rq_too_many_premature_resets", 1); ++} ++ ++TEST_P(Http2FrameIntegrationTest, CloseConnectionWithDeferredStreams) { ++ // Use large number of requests to ensure close is detected while there are ++ // still some deferred streams. ++ const int kRequestsSentPerIOCycle = 1000; ++ config_helper_.addRuntimeOverride("http.max_requests_per_io_cycle", "1"); ++ // Ensure premature reset detection does not get in the way ++ config_helper_.addRuntimeOverride("overload.premature_reset_total_stream_count", "1001"); ++ beginSession(); ++ ++ std::string buffer; ++ for (int i = 0; i < kRequestsSentPerIOCycle; ++i) { ++ auto request = Http2Frame::makeRequest(Http2Frame::makeClientStreamId(i), "a", "/", ++ {{"response_data_blocks", "0"}, {"no_trailers", "1"}}); ++ absl::StrAppend(&buffer, std::string(request)); ++ } ++ ++ ASSERT_TRUE(tcp_client_->write(buffer, false, false)); ++ ASSERT_TRUE(tcp_client_->connected()); ++ // Drop the downstream connection ++ tcp_client_->close(); ++ // Test that Envoy can clean-up deferred streams ++ test_server_->waitForCounterEq("http.config_test.downstream_rq_rx_reset", ++ kRequestsSentPerIOCycle); ++} ++ + INSTANTIATE_TEST_SUITE_P(IpVersions, Http2FrameIntegrationTest, + testing::ValuesIn(Http2FrameIntegrationTest::testParams()), + frameIntegrationTestParamToString); +--- envoy-1.27.0.a/changelogs/current.yaml 2023-10-10 11:36:11.553031312 -0400 ++++ envoy-1.27.0.b/changelogs/current.yaml 2023-10-10 11:37:56.601045565 -0400 |