diff --git a/api/envoy/config/listener/v3/listener.proto b/api/envoy/config/listener/v3/listener.proto index 9381d4eb7aca..4fb1d5146e54 100644 --- a/api/envoy/config/listener/v3/listener.proto +++ b/api/envoy/config/listener/v3/listener.proto @@ -12,6 +12,7 @@ import "envoy/config/listener/v3/listener_components.proto"; import "envoy/config/listener/v3/udp_listener_config.proto"; import "google/protobuf/duration.proto"; +import "google/protobuf/any.proto"; import "google/protobuf/wrappers.proto"; import "xds/annotations/v3/status.proto"; @@ -53,7 +54,7 @@ message ListenerCollection { repeated xds.core.v3.CollectionEntry entries = 1; } -// [#next-free-field: 36] +// [#next-free-field: 37] message Listener { option (udpa.annotations.versioning).previous_message_type = "envoy.api.v2.Listener"; @@ -378,6 +379,11 @@ message Listener { // * :ref:`freebind ` // * :ref:`transparent ` InternalListenerConfig internal_listener = 27; + + // Used to represent a reverse connection listener which, instead of binding to a port and listening, + // initiates reverse connections to a remote endpoint. The used sockets are cached on the remote + // endpoint and can be used to send request to local services. + google.protobuf.Any reverse_connection_listener_config = 36; } // Enable MPTCP (multi-path TCP) on this listener. Clients will be allowed to establish diff --git a/envoy/http/filter.h b/envoy/http/filter.h index f1283df2868a..a40c718c02d4 100644 --- a/envoy/http/filter.h +++ b/envoy/http/filter.h @@ -820,6 +820,11 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks { * @return true if the filter should shed load based on the system pressure, typically memory. */ virtual bool shouldLoadShed() const PURE; + + /** + * @return set a flag to send a local reply immediately for reverse connections. + */ + virtual void setReverseConnForceLocalReply(bool value) PURE; }; /** diff --git a/envoy/network/connection.h b/envoy/network/connection.h index 340ce317c28f..a9bfd10cfeff 100644 --- a/envoy/network/connection.h +++ b/envoy/network/connection.h @@ -318,6 +318,22 @@ class Connection : public Event::DeferredDeletable, */ virtual bool aboveHighWatermark() const PURE; + /** + * @return ConnectionSocketPtr& To get socket from current connection. + */ + virtual const ConnectionSocketPtr& getSocket() const PURE; + + /** + * Mark a connection as a reverse connection. The socket + * is cached and re-used for serving downstream requests. + */ + virtual void setConnectionReused(bool value) PURE; + + /** + * return true if active connection (listener) is reused. + */ + virtual bool isConnectionReused() PURE; + /** * Get the socket options set on this connection. */ diff --git a/envoy/network/connection_handler.h b/envoy/network/connection_handler.h index ed665428b630..6f71132103c4 100644 --- a/envoy/network/connection_handler.h +++ b/envoy/network/connection_handler.h @@ -19,6 +19,8 @@ namespace Envoy { namespace Network { +class LocalRevConnRegistry; + // This interface allows for a listener to perform an alternative behavior when a // packet can't be routed correctly during draining; for example QUIC packets that // are not for an existing connection. @@ -127,6 +129,20 @@ class ConnectionHandler { */ virtual const std::string& statPrefix() const PURE; + /** + * Pass the reverse connection socket to the listener that initiated it. + * @param upstream_socket the socket to be passed. + * @param listener_tag the tag of the listener that initiated the reverse + * connection. + */ + virtual void saveUpstreamConnection(Network::ConnectionSocketPtr&& upstream_socket, + uint64_t listener_tag) PURE; + + /** + * @return the thread local registry. + */ + virtual Network::LocalRevConnRegistry& reverseConnRegistry() const PURE; + /** * Used by ConnectionHandler to manage listeners. */ @@ -303,6 +319,28 @@ class InternalListener : public virtual ConnectionHandler::ActiveListener { using InternalListenerPtr = std::unique_ptr; using InternalListenerOptRef = OptRef; +/** + * Reverse connection listener callbacks. + */ +class ReverseConnectionListener : public virtual ConnectionHandler::ActiveListener { +public: + + /** + * Helper method that triggers the reverse connection workflow. + * @param dispatcher the thread local dispatcher. + * @param conn_handler the thread local connection handler. + * @param config the listener config that triggers the reverse connection. + */ + virtual void startRCWorkflow(Event::Dispatcher& dispatcher, Network::ConnectionHandler& conn_handler, Network::ListenerConfig& config) PURE; + + /** + * Called when a new connection is accepted. + * @param socket supplies the socket that is moved into the callee. + */ + virtual void onAccept(ConnectionSocketPtr&& socket) PURE; +}; +using ReverseConnectionListenerPtr = std::unique_ptr; + /** * The query interface of the registered internal listener callbacks. */ @@ -351,5 +389,35 @@ class InternalListenerRegistry { virtual LocalInternalListenerRegistry* getLocalRegistry() PURE; }; +// The thread local registry. +class LocalRevConnRegistry { +public: + virtual ~LocalRevConnRegistry() = default; + + virtual Network::ReverseConnectionListenerPtr + createActiveReverseConnectionListener(Network::ConnectionHandler& conn_handler, + Event::Dispatcher& dispatcher, + Network::ListenerConfig& config) PURE; +}; + +// The central reverse conn registry interface providing the thread local accessor. +class RevConnRegistry { +public: + virtual ~RevConnRegistry() = default; + + /** + * @return The thread local registry. + */ + virtual LocalRevConnRegistry* getLocalRegistry() PURE; + + /** + * Helper function to create a ReverseConnectionListenerConfig from a google.protobuf.Any. + * @param config is the reverse connection listener config. + * @return the ReverseConnectionListenerConfig object. + */ + virtual absl::StatusOr + fromAnyConfig(const google::protobuf::Any& config) PURE; +}; + } // namespace Network } // namespace Envoy diff --git a/envoy/network/filter.h b/envoy/network/filter.h index 02e016947b01..a56d57f7e323 100644 --- a/envoy/network/filter.h +++ b/envoy/network/filter.h @@ -386,6 +386,12 @@ class ListenerFilter { */ virtual FilterStatus onData(Network::ListenerFilterBuffer& buffer) PURE; + /** + * Called when the connection is closed. Only the current filter that has stopped filter + * chain iteration will get the callback. + */ + virtual void onClose(){}; + /** * Return the size of data the filter want to inspect from the connection. * The size can be increased after filter need to inspect more data. diff --git a/envoy/network/listen_socket.h b/envoy/network/listen_socket.h index 08f4a4d5a672..298f4356dc6d 100644 --- a/envoy/network/listen_socket.h +++ b/envoy/network/listen_socket.h @@ -94,7 +94,7 @@ class ConnectionSocket : public virtual Socket { virtual void dumpState(std::ostream& os, int indent_level = 0) const PURE; }; -using ConnectionSocketPtr = std::unique_ptr; +using ConnectionSocketPtr = std::shared_ptr; } // namespace Network } // namespace Envoy diff --git a/envoy/network/listener.h b/envoy/network/listener.h index 7a47caef201b..de148f7b6480 100644 --- a/envoy/network/listener.h +++ b/envoy/network/listener.h @@ -134,6 +134,42 @@ class InternalListenerConfig { using InternalListenerConfigOptRef = OptRef; +class RevConnRegistry; + +/** + * Configuration for a reverse connection listener. + */ +class ReverseConnectionListenerConfig { +public: + virtual ~ReverseConnectionListenerConfig() = default; + + /** + * Encapsulates the source node, cluster and tenant IDs of the local envoy. These are used when + * the listener creates reverse connections. + */ + struct ReverseConnParams { + std::string src_node_id_; + std::string src_cluster_id_; + std::string src_tenant_id_; + absl::flat_hash_map remote_cluster_to_conn_count_map_; + }; + using ReverseConnParamsPtr = std::unique_ptr; + /** + * @return the private ReverseConnParams object, containing + * the params identifying the local envoy. + */ + virtual ReverseConnParamsPtr& getReverseConnParams() PURE; + + /** + * @return the global reverse conn registry. + */ + virtual RevConnRegistry& reverseConnRegistry() PURE; +}; + +using ReverseConnectionListenerConfigOptRef = OptRef; +using ReverseConnectionListenerConfigPtr = std::unique_ptr; +using ReverseConnParamsPtr = Network::ReverseConnectionListenerConfig::ReverseConnParamsPtr; + /** * Description of the listener. */ @@ -260,6 +296,17 @@ class ListenerConfig { */ virtual InternalListenerConfigOptRef internalListenerConfig() PURE; + /** + * @return the reverse connection configuration for the listener IFF it is an reverse connection + * listener. + */ + virtual ReverseConnectionListenerConfigOptRef reverseConnectionListenerConfig() const PURE; + + /** + * @return the listener's version. + */ + virtual const std::string& versionInfo() const PURE; + /** * @param address is used for query the address specific connection balancer. * @return the connection balancer for this listener. All listeners have a connection balancer, diff --git a/envoy/upstream/host_description.h b/envoy/upstream/host_description.h index 09e6fc579e77..0f8164e54c7f 100644 --- a/envoy/upstream/host_description.h +++ b/envoy/upstream/host_description.h @@ -262,6 +262,17 @@ class HostDescription { */ virtual absl::optional lastHcPassTime() const PURE; + /** + * @return host-id to be used to retrieve reverse connection sockets from + * reverse connection handler. + */ + virtual const absl::string_view getHostId() const PURE; + + /* + * Set the current host-id. + */ + virtual void setHostId(const std::string host_id) PURE; + /** * Set the timestamp of when the host has transitioned from unhealthy to healthy state via an * active health checking. diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index ead93183f0cd..bc204ff98f17 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -156,6 +156,10 @@ class AsyncStreamImpl : public virtual AsyncClient::Stream, const StreamInfo::StreamInfo& streamInfo() const override { return stream_info_; } StreamInfo::StreamInfoImpl& streamInfo() override { return stream_info_; } + void setReverseConnForceLocalReply(bool value) override { + ENVOY_LOG_MISC(error, "Cannot set value {}. AsyncStreamImpl does not support reverse connection.", value); + } + protected: AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks, const AsyncClient::StreamOptions& options, absl::Status& creation_status); diff --git a/source/common/http/codec_client.h b/source/common/http/codec_client.h index af59af2d1582..be2b1e01dfb6 100644 --- a/source/common/http/codec_client.h +++ b/source/common/http/codec_client.h @@ -143,6 +143,8 @@ class CodecClient : protected Logger::Loggable, */ void connect(); + Network::ClientConnectionPtr& connection() { return connection_; } + bool connectCalled() const { return connect_called_; } protected: diff --git a/source/common/http/conn_pool_base.cc b/source/common/http/conn_pool_base.cc index f65427fbcc88..ec42493c9c81 100644 --- a/source/common/http/conn_pool_base.cc +++ b/source/common/http/conn_pool_base.cc @@ -189,10 +189,10 @@ uint64_t MultiplexedActiveClientBase::maxStreamsPerConnection(uint64_t max_strea MultiplexedActiveClientBase::MultiplexedActiveClientBase( HttpConnPoolImplBase& parent, uint32_t effective_concurrent_streams, uint32_t max_configured_concurrent_streams, Stats::Counter& cx_total, - OptRef data) + OptRef data, CreateConnectionDataFn connection_fn) : Envoy::Http::ActiveClient( parent, maxStreamsPerConnection(parent.host()->cluster().maxRequestsPerConnection()), - effective_concurrent_streams, max_configured_concurrent_streams, data) { + effective_concurrent_streams, max_configured_concurrent_streams, data, connection_fn) { codec_client_->setCodecClientCallbacks(*this); codec_client_->setCodecConnectionCallbacks(*this); cx_total.inc(); diff --git a/source/common/http/conn_pool_base.h b/source/common/http/conn_pool_base.h index 4facd3c82d5d..2e8940d66ccb 100644 --- a/source/common/http/conn_pool_base.h +++ b/source/common/http/conn_pool_base.h @@ -102,13 +102,22 @@ class HttpConnPoolImplBase : public Envoy::ConnectionPool::ConnPoolImplBase, absl::optional origin_; }; +using CreateConnectionDataFn = + std::function; // An implementation of Envoy::ConnectionPool::ActiveClient for HTTP/1.1 and HTTP/2 class ActiveClient : public Envoy::ConnectionPool::ActiveClient { public: - ActiveClient(HttpConnPoolImplBase& parent, uint64_t lifetime_stream_limit, - uint64_t effective_concurrent_stream_limit, - uint64_t configured_concurrent_stream_limit, - OptRef opt_data) + ActiveClient( + HttpConnPoolImplBase& parent, uint64_t lifetime_stream_limit, + uint64_t effective_concurrent_stream_limit, uint64_t configured_concurrent_stream_limit, + OptRef opt_data, + CreateConnectionDataFn connection_fn = + [](HttpConnPoolImplBase& parent) { + return static_cast(&parent) + ->host() + ->createConnection(parent.dispatcher(), parent.socketOptions(), + parent.transportSocketOptions()); + }) : Envoy::ConnectionPool::ActiveClient(parent, lifetime_stream_limit, effective_concurrent_stream_limit, configured_concurrent_stream_limit) { @@ -116,11 +125,8 @@ class ActiveClient : public Envoy::ConnectionPool::ActiveClient { initialize(opt_data.value(), parent); return; } - // The static cast makes sure we call the base class host() and not - // HttpConnPoolImplBase::host which is of a different type. - Upstream::Host::CreateConnectionData data = - static_cast(&parent)->host()->createConnection( - parent.dispatcher(), parent.socketOptions(), parent.transportSocketOptions()); + ENVOY_LOG(debug, "Creating CreateConnectionData"); + Upstream::Host::CreateConnectionData data = connection_fn(parent); initialize(data, parent); } @@ -204,7 +210,8 @@ class MultiplexedActiveClientBase : public CodecClientCallbacks, public: MultiplexedActiveClientBase(HttpConnPoolImplBase& parent, uint32_t effective_concurrent_streams, uint32_t max_configured_concurrent_streams, Stats::Counter& cx_total, - OptRef data); + OptRef data, + CreateConnectionDataFn connection_fn = nullptr); ~MultiplexedActiveClientBase() override = default; // Caps max streams per connection below 2^31 to prevent overflow. static uint64_t maxStreamsPerConnection(uint64_t max_streams_config); diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index 665c0572a9c3..4b7f36e5d37a 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -448,6 +448,10 @@ void ActiveStreamDecoderFilter::modifyDecodingBuffer( callback(*parent_.buffered_request_data_.get()); } +void ActiveStreamDecoderFilter::setReverseConnForceLocalReply(bool value) { + parent_.setReverseConnForceLocalReply(value); +} + void ActiveStreamDecoderFilter::sendLocalReply( Code code, absl::string_view body, std::function modify_headers, @@ -1005,7 +1009,7 @@ void DownstreamFilterManager::sendLocalReply( // We only prepare a local reply to execute later if we're actively // invoking filters to avoid re-entrant in filters. - if (state_.filter_call_state_ & FilterCallState::IsDecodingMask) { + if (!reverse_conn_force_local_reply_ && state_.filter_call_state_ & FilterCallState::IsDecodingMask) { prepareLocalReplyViaFilterChain(is_grpc_request, code, body, modify_headers, is_head_request, grpc_status, details); } else { diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index 2a2a8b31787a..39bf030205ef 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -324,6 +324,8 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase, void stopDecodingIfNonTerminalFilterEncodedEndStream(bool encoded_end_stream); StreamDecoderFilters::Iterator entry() const { return entry_; } + void setReverseConnForceLocalReply(bool value) override; + StreamDecoderFilterSharedPtr handle_; StreamDecoderFilters::Iterator entry_{}; bool is_grpc_request_{}; @@ -909,6 +911,10 @@ class FilterManager : public ScopeTrackedObject, virtual bool shouldLoadShed() { return false; }; + void setReverseConnForceLocalReply(bool value) { + reverse_conn_force_local_reply_ = value; + } + void sendGoAwayAndClose() { // Stop filter chain iteration by checking encoder or decoder chain. if (state_.filter_call_state_ & FilterCallState::IsDecodingMask) { @@ -1105,6 +1111,7 @@ class FilterManager : public ScopeTrackedObject, const uint64_t stream_id_; Buffer::BufferMemoryAccountSharedPtr account_; const bool proxy_100_continue_; + bool reverse_conn_force_local_reply_{false}; StreamDecoderFilters decoder_filters_; StreamEncoderFilters encoder_filters_; diff --git a/source/common/http/headers.h b/source/common/http/headers.h index 17550b4a0b62..249b858054e5 100644 --- a/source/common/http/headers.h +++ b/source/common/http/headers.h @@ -238,6 +238,8 @@ class HeaderValues { const LowerCaseString XContentTypeOptions{"x-content-type-options"}; const LowerCaseString XSquashDebug{"x-squash-debug"}; const LowerCaseString EarlyData{"early-data"}; + const LowerCaseString EnvoyDstNodeUUID{"x-remote-node-id"}; + const LowerCaseString EnvoyDstClusterUUID{"x-dst-cluster-uuid"}; struct { const std::string Close{"close"}; diff --git a/source/common/http/http2/BUILD b/source/common/http/http2/BUILD index 34b9dadd3868..5a0a5c1029ac 100644 --- a/source/common/http/http2/BUILD +++ b/source/common/http/http2/BUILD @@ -83,6 +83,7 @@ envoy_cc_library( srcs = ["conn_pool.cc"], hdrs = ["conn_pool.h"], deps = [ + "//envoy/config:typed_config_interface", "//envoy/event:dispatcher_interface", "//envoy/upstream:upstream_interface", "//source/common/http:codec_client_lib", diff --git a/source/common/http/http2/conn_pool.cc b/source/common/http/http2/conn_pool.cc index 421f07abc34a..3fd0acd6c111 100644 --- a/source/common/http/http2/conn_pool.cc +++ b/source/common/http/http2/conn_pool.cc @@ -2,8 +2,8 @@ #include +#include "envoy/config/typed_config.h" #include "envoy/event/dispatcher.h" -#include "envoy/upstream/upstream.h" #include "source/common/http/http2/codec_impl.h" #include "source/common/runtime/runtime_features.h" @@ -37,11 +37,12 @@ uint32_t ActiveClient::calculateInitialStreamsLimit( } ActiveClient::ActiveClient(HttpConnPoolImplBase& parent, - OptRef data) + OptRef data, + CreateConnectionDataFn connection_fn) : MultiplexedActiveClientBase( parent, calculateInitialStreamsLimit(parent.cache(), parent.origin(), parent.host()), parent.host()->cluster().http2Options().max_concurrent_streams().value(), - parent.host()->cluster().trafficStats()->upstream_cx_http2_total_, data) {} + parent.host()->cluster().trafficStats()->upstream_cx_http2_total_, data, connection_fn) {} ConnectionPool::InstancePtr allocateConnPool(Event::Dispatcher& dispatcher, Random::RandomGenerator& random_generator, diff --git a/source/common/http/http2/conn_pool.h b/source/common/http/http2/conn_pool.h index 2de2700ce5c4..1d2352fdedd6 100644 --- a/source/common/http/http2/conn_pool.h +++ b/source/common/http/http2/conn_pool.h @@ -25,7 +25,8 @@ class ActiveClient : public MultiplexedActiveClientBase { Upstream::HostDescriptionConstSharedPtr host); ActiveClient(Envoy::Http::HttpConnPoolImplBase& parent, - OptRef data); + OptRef data, + CreateConnectionDataFn connection_fn = nullptr); }; ConnectionPool::InstancePtr @@ -37,6 +38,26 @@ allocateConnPool(Event::Dispatcher& dispatcher, Random::RandomGenerator& random_ absl::optional origin = absl::nullopt, Http::HttpServerPropertiesCacheSharedPtr http_server_properties_cache = nullptr); +/** + * Abstract class for allocating reverse connection pools. + */ +class ReverseConnPoolFactory : public Config::UntypedFactory { +public: + virtual ~ReverseConnPoolFactory() = default; + + virtual ConnectionPool::InstancePtr allocateConnPool( + Event::Dispatcher& dispatcher, Random::RandomGenerator& random_generator, + Singleton::Manager& singleton_manager, Upstream::HostConstSharedPtr host, + Upstream::ResourcePriority priority, + const Network::ConnectionSocket::OptionsSharedPtr& options, + const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options, + Upstream::ClusterConnectivityState& state, + absl::optional origin = absl::nullopt, + Http::HttpServerPropertiesCacheSharedPtr http_server_properties_cache = nullptr) PURE; + + std::string category() const override { return "envoy.http.reverse_conn"; } +}; + } // namespace Http2 } // namespace Http } // namespace Envoy diff --git a/source/common/json/json_internal.cc b/source/common/json/json_internal.cc index f4ee6f6012f6..9b67d36d47fc 100644 --- a/source/common/json/json_internal.cc +++ b/source/common/json/json_internal.cc @@ -794,10 +794,21 @@ std::string Factory::serialize(absl::string_view str) { return j.dump(-1, ' ', false, nlohmann::detail::error_handler_t::replace); } +template std::string Factory::serialize(const T& items) { + nlohmann::json j = nlohmann::json(items); + return j.dump(); +} + std::vector Factory::jsonToMsgpack(const std::string& json_string) { return nlohmann::json::to_msgpack(nlohmann::json::parse(json_string, nullptr, false)); } +// Template instantiation for serialize function. +template std::string Factory::serialize(const std::list& items); +template std::string Factory::serialize(const absl::flat_hash_set& items); +template std::string Factory::serialize( + const absl::flat_hash_map>& items); + } // namespace Nlohmann } // namespace Json } // namespace Envoy diff --git a/source/common/json/json_internal.h b/source/common/json/json_internal.h index 545a0560f2d3..903a9938aa70 100644 --- a/source/common/json/json_internal.h +++ b/source/common/json/json_internal.h @@ -39,6 +39,9 @@ class Factory { * See: https://github.com/msgpack/msgpack/blob/master/spec.md */ static std::vector jsonToMsgpack(const std::string& json); + + // Serialization helper function for iterable types. + template static std::string serialize(const T& items); }; } // namespace Nlohmann diff --git a/source/common/json/json_loader.cc b/source/common/json/json_loader.cc index c80121ee0385..130c9dbeac64 100644 --- a/source/common/json/json_loader.cc +++ b/source/common/json/json_loader.cc @@ -18,5 +18,9 @@ std::vector Factory::jsonToMsgpack(const std::string& json) { return Nlohmann::Factory::jsonToMsgpack(json); } +const std::string Factory::listAsJsonString(const std::list& items) { + return Nlohmann::Factory::serialize(items); +} + } // namespace Json } // namespace Envoy diff --git a/source/common/json/json_loader.h b/source/common/json/json_loader.h index a7f17e72a22f..f659e7ea25f7 100644 --- a/source/common/json/json_loader.h +++ b/source/common/json/json_loader.h @@ -7,6 +7,9 @@ #include "source/common/common/statusor.h" #include "source/common/protobuf/protobuf.h" +#include "absl/container/flat_hash_map.h" +#include "absl/container/flat_hash_set.h" + namespace Envoy { namespace Json { @@ -28,6 +31,11 @@ class Factory { * See: https://github.com/msgpack/msgpack/blob/master/spec.md */ static std::vector jsonToMsgpack(const std::string& json); + + /* + * Constructs a JSON string from a list of strings. + */ + static const std::string listAsJsonString(const std::list& items); }; } // namespace Json diff --git a/source/common/listener_manager/active_stream_listener_base.cc b/source/common/listener_manager/active_stream_listener_base.cc index 4995bec8b2d5..bccc5eeed92a 100644 --- a/source/common/listener_manager/active_stream_listener_base.cc +++ b/source/common/listener_manager/active_stream_listener_base.cc @@ -128,7 +128,9 @@ void ActiveTcpConnection::onEvent(Network::ConnectionEvent event) { if (event == Network::ConnectionEvent::LocalClose || event == Network::ConnectionEvent::RemoteClose) { stream_info_->setDownstreamTransportFailureReason(connection_->transportFailureReason()); - active_connections_.listener_.removeConnection(*this); + if (!connection_->isConnectionReused()) { + active_connections_.listener_.removeConnection(*this); + } } } diff --git a/source/common/listener_manager/active_stream_listener_base.h b/source/common/listener_manager/active_stream_listener_base.h index 783aaa2896c4..b1bad3cc7c24 100644 --- a/source/common/listener_manager/active_stream_listener_base.h +++ b/source/common/listener_manager/active_stream_listener_base.h @@ -201,7 +201,7 @@ class OwnedActiveStreamListenerBase : public ActiveStreamListenerBase { * Remove and destroy an active connection. * @param connection supplies the connection to remove. */ - void removeConnection(ActiveTcpConnection& connection); + virtual void removeConnection(ActiveTcpConnection& connection); protected: /** diff --git a/source/common/listener_manager/active_tcp_listener.cc b/source/common/listener_manager/active_tcp_listener.cc index 1f7c88f0a7c3..b0d60674403a 100644 --- a/source/common/listener_manager/active_tcp_listener.cc +++ b/source/common/listener_manager/active_tcp_listener.cc @@ -55,6 +55,7 @@ ActiveTcpListener::~ActiveTcpListener() { ASSERT(active_connections != nullptr); auto& connections = active_connections->connections_; while (!connections.empty()) { + connections.front()->connection_->setConnectionReused(false); connections.front()->connection_->close( Network::ConnectionCloseType::NoFlush, "purging_socket_that_have_not_progressed_to_connections"); diff --git a/source/common/listener_manager/active_tcp_socket.cc b/source/common/listener_manager/active_tcp_socket.cc index 0db63c47199e..d40267d8debe 100644 --- a/source/common/listener_manager/active_tcp_socket.cc +++ b/source/common/listener_manager/active_tcp_socket.cc @@ -74,6 +74,7 @@ void ActiveTcpSocket::createListenerFilterBuffer() { listener_filter_buffer_ = std::make_unique( socket_->ioHandle(), listener_.dispatcher(), [this](bool error) { + (*iter_)->onClose(); socket_->ioHandle().close(); if (error) { listener_.stats_.downstream_listener_filter_error_.inc(); diff --git a/source/common/listener_manager/active_tcp_socket.h b/source/common/listener_manager/active_tcp_socket.h index 6423f3ba54bd..9491a2d38713 100644 --- a/source/common/listener_manager/active_tcp_socket.h +++ b/source/common/listener_manager/active_tcp_socket.h @@ -53,6 +53,8 @@ class ActiveTcpSocket : public Network::ListenerFilterManager, } size_t maxReadBytes() const override { return listener_filter_->maxReadBytes(); } + + void onClose() override { return listener_filter_->onClose(); } }; using ListenerFilterWrapperPtr = std::unique_ptr; diff --git a/source/common/listener_manager/connection_handler_impl.cc b/source/common/listener_manager/connection_handler_impl.cc index efeaf37fd71c..08b492676988 100644 --- a/source/common/listener_manager/connection_handler_impl.cc +++ b/source/common/listener_manager/connection_handler_impl.cc @@ -80,6 +80,21 @@ void ConnectionHandlerImpl::addListener(absl::optional overridden_list config, config.listenSocketFactories()[0]->localAddress(), listener_reject_fraction_, disable_listeners_, std::move(internal_listener), config.shouldBypassOverloadManager() ? null_overload_manager_ : overload_manager_); + } else if (config.reverseConnectionListenerConfig().has_value()) { + ENVOY_LOG(debug, "adding reverse connection listener with name: {} tag: {}", config.name(), + config.listenerTag()); + Network::RevConnRegistry& reverse_conn_registry = + config.reverseConnectionListenerConfig()->reverseConnRegistry(); + ENVOY_LOG(debug, "Obtaining thread local reverse conn registry"); + local_reverse_conn_registry_ = reverse_conn_registry.getLocalRegistry(); + RELEASE_ASSERT(local_reverse_conn_registry_ != nullptr, + "Failed to get local reverse conn listener registry"); + auto rc_listener = local_reverse_conn_registry_->createActiveReverseConnectionListener( + *this, dispatcher(), config); + details->addActiveListener( + config, config.listenSocketFactories()[0]->localAddress(), listener_reject_fraction_, + disable_listeners_, std::move(rc_listener), + config.shouldBypassOverloadManager() ? null_overload_manager_ : overload_manager_); } else if (config.listenSocketFactories()[0]->socketType() == Network::Socket::Type::Stream) { auto overload_state = config.shouldBypassOverloadManager() @@ -155,6 +170,7 @@ void ConnectionHandlerImpl::addListener(absl::optional overridden_list per_address_details->address_->envoyInternalAddress()->addressId(), per_address_details); } } + details->setName(config.name()); listener_map_by_tag_.emplace(config.listenerTag(), std::move(details)); } @@ -309,6 +325,29 @@ void ConnectionHandlerImpl::setListenerRejectFraction(UnitFloat reject_fraction) } } +void ConnectionHandlerImpl::saveUpstreamConnection(Network::ConnectionSocketPtr&& upstream_socket, + uint64_t listener_tag) { + auto listener = findActiveListenerByTag(listener_tag); + if (listener.has_value()) { + ENVOY_LOG(debug, "saving reverse connection for {} to listener {} tag {}", + upstream_socket->connectionInfoProvider().remoteAddress()->asString(), + listener->get().name_, listener_tag); + for (auto address_list : listener->get().per_address_details_list_) { + if (address_list->reverseConnectionListener().has_value()) { + ENVOY_LOG(debug, "passing reverse connection to listener {} tag {}", listener->get().name_, + listener_tag); + address_list->reverseConnectionListener()->onAccept(std::move(upstream_socket)); + ENVOY_LOG(debug, "reverse connection passed to listener {} tag {}", listener->get().name_, + listener_tag); + return; + } else { + ENVOY_LOG(debug, "no reverse connection listener for name {} tag {}", listener->get().name_, + listener_tag); + } + } + } +} + Network::InternalListenerOptRef ConnectionHandlerImpl::findByAddress(const Network::Address::InstanceConstSharedPtr& address) { ASSERT(address->type() == Network::Address::Type::EnvoyInternal); @@ -338,6 +377,14 @@ ConnectionHandlerImpl::PerAddressActiveListenerDetails::internalListener() { return (val != nullptr) ? makeOptRef(val->get()) : absl::nullopt; } +OptRef +ConnectionHandlerImpl::PerAddressActiveListenerDetails::reverseConnectionListener() { + auto* val = absl::get_if< + std::reference_wrapper>( + &typed_listener_); + return (val != nullptr) ? makeOptRef(val->get()) : absl::nullopt; +} + ConnectionHandlerImpl::ActiveListenerDetailsOptRef ConnectionHandlerImpl::findActiveListenerByTag(uint64_t listener_tag) { if (auto iter = listener_map_by_tag_.find(listener_tag); iter != listener_map_by_tag_.end()) { diff --git a/source/common/listener_manager/connection_handler_impl.h b/source/common/listener_manager/connection_handler_impl.h index c5a704ed9a70..75f073459233 100644 --- a/source/common/listener_manager/connection_handler_impl.h +++ b/source/common/listener_manager/connection_handler_impl.h @@ -23,6 +23,7 @@ namespace Server { class ActiveUdpListenerBase; class ActiveTcpListener; class ActiveInternalListener; +class ActiveReverseConnectionListener; /** * Server side connection handler. This is used both by workers as well as the @@ -57,7 +58,11 @@ class ConnectionHandlerImpl : public ConnectionHandler, void enableListeners() override; void setListenerRejectFraction(UnitFloat reject_fraction) override; const std::string& statPrefix() const override { return per_handler_stat_prefix_; } - + void saveUpstreamConnection(Network::ConnectionSocketPtr&& upstream_socket, + uint64_t listener_tag) override; + Network::LocalRevConnRegistry& reverseConnRegistry() const override { + return *local_reverse_conn_registry_; + } // Network::TcpConnectionHandler Event::Dispatcher& dispatcher() override { return dispatcher_; } Network::BalancedConnectionHandlerOptRef @@ -91,18 +96,22 @@ class ConnectionHandlerImpl : public ConnectionHandler, absl::variant, std::reference_wrapper, - std::reference_wrapper> + std::reference_wrapper, + std::reference_wrapper> typed_listener_; // Helpers for accessing the data in the variant for cleaner code. ActiveTcpListenerOptRef tcpListener(); UdpListenerCallbacksOptRef udpListener(); Network::InternalListenerOptRef internalListener(); + OptRef reverseConnectionListener(); }; struct ActiveListenerDetails { std::vector> per_address_details_list_; + std::string name_; + using ListenerMethodFn = std::function; /** @@ -115,6 +124,10 @@ class ConnectionHandlerImpl : public ConnectionHandler, }); } + void setName(const std::string& name) { + name_ = name; + } + /** * Add an ActiveListener into the list. */ @@ -153,6 +166,7 @@ class ConnectionHandlerImpl : public ConnectionHandler, // This has a value on worker threads, and no value on the main thread. const absl::optional worker_index_; Event::Dispatcher& dispatcher_; + Network::LocalRevConnRegistry* local_reverse_conn_registry_; OptRef overload_manager_; OptRef null_overload_manager_; const std::string per_handler_stat_prefix_; diff --git a/source/common/listener_manager/listener_impl.cc b/source/common/listener_manager/listener_impl.cc index 2affcdc55301..747845422d90 100644 --- a/source/common/listener_manager/listener_impl.cc +++ b/source/common/listener_manager/listener_impl.cc @@ -247,6 +247,9 @@ std::string listenerStatsScope(const envoy::config::listener::v3::Listener& conf if (config.has_internal_listener()) { return absl::StrCat("envoy_internal_", config.name()); } + if (config.has_reverse_connection_listener_config()) { + return absl::StrCat("reverse_connection_listener_", config.name()); + } auto address_or_error = Network::Address::resolveProtoAddress(config.address()); if (address_or_error.status().ok()) { return address_or_error.value()->asString(); @@ -412,6 +415,7 @@ ListenerImpl::ListenerImpl(const envoy::config::listener::v3::Listener& config, buildOriginalDstListenerFilter(config); buildProxyProtocolListenerFilter(config); SET_AND_RETURN_IF_NOT_OK(buildInternalListener(config), creation_status); + SET_AND_RETURN_IF_NOT_OK(buildReverseConnectionListener(config), creation_status); } if (!workers_started_) { // Initialize dynamic_init_manager_ from Server's init manager if it's not initialized. @@ -479,6 +483,7 @@ ListenerImpl::ListenerImpl(ListenerImpl& origin, SET_AND_RETURN_IF_NOT_OK(validateFilterChains(config), creation_status); SET_AND_RETURN_IF_NOT_OK(buildFilterChains(config), creation_status); SET_AND_RETURN_IF_NOT_OK(buildInternalListener(config), creation_status); + SET_AND_RETURN_IF_NOT_OK(buildReverseConnectionListener(config), creation_status); if (socket_type_ == Network::Socket::Type::Stream) { // Apply the options below only for TCP. buildSocketOptions(config); @@ -591,6 +596,42 @@ ListenerImpl::buildInternalListener(const envoy::config::listener::v3::Listener& return absl::OkStatus(); } +absl::Status +ListenerImpl::buildReverseConnectionListener(const envoy::config::listener::v3::Listener& config) { + + ENVOY_LOG(debug, "Listener: {}; Reverse conn metadata : {}", config.name(), + config.reverse_connection_listener_config().DebugString()); + + if (!config.has_reverse_connection_listener_config()) { + ENVOY_LOG(info, "Listener: {}; Reverse connection listener config is not present. Listener will bind to port", config.name()); + return absl::OkStatus(); + } + // Reverse connection listener should not bind to port. + bind_to_port_ = false; + + ENVOY_LOG(debug, "Building reverse connection config for listener: {} tag: {}", config.name(), + listener_tag_); + std::shared_ptr reverse_conn_registry = + parent_.server_.singletonManager() + .getTyped("reverse_conn_registry_singleton"); + if (reverse_conn_registry == nullptr) { + ENVOY_LOG(error, "Cannot build reverse conn listener name: {} tag: {}. Reverse conn registry not found", + config.name(), listener_tag_); + return absl::OkStatus(); + } + auto config_or_error = + reverse_conn_registry->fromAnyConfig(config.reverse_connection_listener_config()); + if (!config_or_error.ok() || *config_or_error == nullptr) { + return absl::InvalidArgumentError( + fmt::format("Cannot build reverse conn listener name: {} tag: {} Error {} : failed to " + "unpack reverse connection " + "config", + config.name(), listener_tag_, config_or_error.status())); + } + reverse_connection_listener_config_ = std::move(*config_or_error); + return absl::OkStatus(); +} + bool ListenerImpl::buildUdpListenerWorkerRouter(const Network::Address::Instance& address, uint32_t concurrency) { if (socket_type_ != Network::Socket::Type::Datagram) { diff --git a/source/common/listener_manager/listener_impl.h b/source/common/listener_manager/listener_impl.h index 2f37b82b1dbb..46a548f16ff6 100644 --- a/source/common/listener_manager/listener_impl.h +++ b/source/common/listener_manager/listener_impl.h @@ -264,7 +264,7 @@ class ListenerImpl final : public Network::ListenerConfig, ASSERT(listen_socket_options_list_.size() > address_index); return listen_socket_options_list_[address_index]; } - const std::string& versionInfo() const { return version_info_; } + const std::string& versionInfo() const override { return version_info_; } bool reusePort() const { return reuse_port_; } static bool getReusePortOrDefault(Server::Instance& server, const envoy::config::listener::v3::Listener& config, @@ -308,6 +308,11 @@ class ListenerImpl final : public Network::ListenerConfig, return internal_listener_config_ != nullptr ? *internal_listener_config_ : Network::InternalListenerConfigOptRef(); } + Network::ReverseConnectionListenerConfigOptRef reverseConnectionListenerConfig() const override { + return reverse_connection_listener_config_ != nullptr + ? *reverse_connection_listener_config_ + : Network::ReverseConnectionListenerConfigOptRef(); + } Network::ConnectionBalancer& connectionBalancer(const Network::Address::Instance& address) override { auto balancer = connection_balancers_.find(address.asString()); @@ -398,6 +403,7 @@ class ListenerImpl final : public Network::ListenerConfig, // Helpers for constructor. void buildAccessLog(const envoy::config::listener::v3::Listener& config); absl::Status buildInternalListener(const envoy::config::listener::v3::Listener& config); + absl::Status buildReverseConnectionListener(const envoy::config::listener::v3::Listener& config); absl::Status validateConfig(); bool buildUdpListenerWorkerRouter(const Network::Address::Instance& address, uint32_t concurrency); @@ -428,7 +434,7 @@ class ListenerImpl final : public Network::ListenerConfig, const Network::Socket::Type socket_type_; std::vector socket_factories_; - const bool bind_to_port_; + bool bind_to_port_; const bool mptcp_enabled_; const bool hand_off_restored_destination_connections_; const uint32_t per_connection_buffer_limit_bytes_; @@ -461,6 +467,7 @@ class ListenerImpl final : public Network::ListenerConfig, const bool continue_on_listener_filters_timeout_; std::shared_ptr udp_listener_config_; std::unique_ptr internal_listener_config_; + std::unique_ptr reverse_connection_listener_config_; // The key is the address string, the value is the address specific connection balancer. // TODO (soulxu): Add hash support for address, then needn't a string address as key anymore. absl::flat_hash_map connection_balancers_; diff --git a/source/common/listener_manager/listener_manager_impl.cc b/source/common/listener_manager/listener_manager_impl.cc index 8de7a7da3360..b0ac87fcbe82 100644 --- a/source/common/listener_manager/listener_manager_impl.cc +++ b/source/common/listener_manager/listener_manager_impl.cc @@ -371,6 +371,7 @@ ListenerManagerImpl::ListenerManagerImpl(Instance& server, for (uint32_t i = 0; i < server.options().concurrency(); i++) { workers_.emplace_back(worker_factory.createWorker( i, server.overloadManager(), server.nullOverloadManager(), absl::StrCat("worker_", i))); + ENVOY_LOG(debug, "Starting worker {}", i); } } diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 71236c272fd7..150caffed9d2 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -118,8 +118,6 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt } ConnectionImpl::~ConnectionImpl() { - ASSERT(!socket_->isOpen() && delayed_close_timer_ == nullptr, - "ConnectionImpl was unexpectedly torn down without being closed."); // In general we assume that owning code has called close() previously to the destructor being // run. This generally must be done so that callbacks run in the correct context (vs. deferred @@ -145,7 +143,8 @@ void ConnectionImpl::removeReadFilter(ReadFilterSharedPtr filter) { bool ConnectionImpl::initializeReadFilters() { return filter_manager_.initializeReadFilters(); } void ConnectionImpl::close(ConnectionCloseType type) { - if (!socket_->isOpen()) { + if (socket_== nullptr || !socket_->isOpen()) { + ENVOY_CONN_LOG_EVENT(debug, "connection_closing", "Not closing conn, socket object is null or socket is not open", *this); return; } @@ -169,7 +168,12 @@ void ConnectionImpl::close(ConnectionCloseType type) { if (data_to_write > 0 && type != ConnectionCloseType::Abort) { // We aren't going to wait to flush, but try to write as much as we can if there is pending // data. - transport_socket_->doWrite(*write_buffer_, true); + if (reuse_connection_) { + // Don't close connection socket in case of reversed connection. + transport_socket_->doWrite(*write_buffer_, false); + } else { + transport_socket_->doWrite(*write_buffer_, true); + } } if (type != ConnectionCloseType::FlushWriteAndDelay || !delayed_close_timeout_set) { @@ -267,7 +271,8 @@ void ConnectionImpl::setDetectedCloseType(DetectedCloseType close_type) { } void ConnectionImpl::closeSocket(ConnectionEvent close_type) { - if (!socket_->isOpen()) { + + if (socket_ == nullptr || !socket_->isOpen()) { return; } @@ -278,7 +283,10 @@ void ConnectionImpl::closeSocket(ConnectionEvent close_type) { } ENVOY_CONN_LOG(debug, "closing socket: {}", *this, static_cast(close_type)); - transport_socket_->closeSocket(close_type); + if (!reuse_connection_) { + ENVOY_CONN_LOG(debug, "closing socket transport_socket_", *this); + transport_socket_->closeSocket(close_type); + } // Drain input and output buffers. updateReadBufferStats(0, 0); @@ -304,8 +312,10 @@ void ConnectionImpl::closeSocket(ConnectionEvent close_type) { #endif } - // It is safe to call close() since there is an IO handle check. - socket_->close(); + if (!reuse_connection_) { + ENVOY_LOG(debug, "closeSocket:"); + socket_->close(); + } // Call the base class directly as close() is called in the destructor. ConnectionImpl::raiseEvent(close_type); @@ -1016,6 +1026,18 @@ ClientConnectionImpl::ClientConnectionImpl( } } +// Constructor to create "clientConnection" object from an existing downstream connection +ClientConnectionImpl::ClientConnectionImpl(Event::Dispatcher& dispatcher, + Network::TransportSocketPtr&& transport_socket, + Network::ConnectionSocketPtr&& downstream_socket) + : ConnectionImpl(dispatcher, std::move(downstream_socket), std::move(transport_socket), + stream_info_, false), + stream_info_(dispatcher.timeSource(), socket_->connectionInfoProviderSharedPtr(), + StreamInfo::FilterState::LifeSpan::Connection) { + + stream_info_.setUpstreamInfo(std::make_shared()); +} + void ClientConnectionImpl::connect() { ENVOY_CONN_LOG_EVENT(debug, "client_connection", "connecting to {}", *this, socket_->connectionInfoProvider().remoteAddress()->asString()); diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index 237f027e9311..c8ddf19eba60 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -62,6 +62,13 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback void removeReadFilter(ReadFilterSharedPtr filter) override; bool initializeReadFilters() override; + // New Method added to retrieve the connection socket. + // so that, new connection can be created with existing connection socket. + // This is required for reverse connections. + const ConnectionSocketPtr& getSocket() const override { return socket_; } + void setConnectionReused(bool value) override { reuse_connection_ = value; } + bool isConnectionReused() override { return reuse_connection_; } + // Network::Connection void addBytesSentCallback(BytesSentCb cb) override; void enableHalfClose(bool enabled) override; @@ -143,6 +150,11 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback void flushWriteBuffer() override; TransportSocketPtr& transportSocket() { return transport_socket_; } + // Used on the responder envoy to mark an active connection accepted by a listener which will + // be used as a reverse connection. The socket for such a connection is closed upon draining + // of the owning listener. + bool reuse_connection_ = false; + // Obtain global next connection ID. This should only be used in tests. static uint64_t nextGlobalIdForTest() { return next_global_id_; } @@ -285,9 +297,13 @@ class ClientConnectionImpl : public ConnectionImpl, virtual public ClientConnect Network::TransportSocketPtr&& transport_socket, const Network::ConnectionSocket::OptionsSharedPtr& options, const Network::TransportSocketOptionsConstSharedPtr& transport_options); + // Method to create client connection from downstream connection + ClientConnectionImpl(Event::Dispatcher& dispatcher, + Network::TransportSocketPtr&& transport_socket, + Network::ConnectionSocketPtr&& downstream_socket); // Network::ClientConnection - void connect() override; + virtual void connect() override; private: void onConnected() override; diff --git a/source/common/network/happy_eyeballs_connection_impl.h b/source/common/network/happy_eyeballs_connection_impl.h index 94d179147250..f7c95135dd17 100644 --- a/source/common/network/happy_eyeballs_connection_impl.h +++ b/source/common/network/happy_eyeballs_connection_impl.h @@ -95,6 +95,9 @@ class HappyEyeballsConnectionImpl : public MultiConnectionBaseImpl, dispatcher, address_list, upstream_local_address_selector, socket_factory, transport_socket_options, host, options, happy_eyeballs_config)) {} + const Network::ConnectionSocketPtr& getSocket() const override { PANIC("not implemented"); } + void setConnectionReused(bool) override { PANIC("not implemented"); } + bool isConnectionReused() override { PANIC("not implemented"); } }; } // namespace Network diff --git a/source/common/network/multi_connection_base_impl.h b/source/common/network/multi_connection_base_impl.h index 6bb373114809..261319deef4d 100644 --- a/source/common/network/multi_connection_base_impl.h +++ b/source/common/network/multi_connection_base_impl.h @@ -133,6 +133,10 @@ class MultiConnectionBaseImpl : public ClientConnection, void hashKey(std::vector& hash_key) const override; void dumpState(std::ostream& os, int indent_level) const override; + const Network::ConnectionSocketPtr& getSocket() const override { PANIC("not implemented"); } + void setConnectionReused(bool) override { PANIC("not implemented"); } + bool isConnectionReused() override { PANIC("not implemented"); } + private: // ConnectionCallbacks which will be set on an ClientConnection which // sends connection events back to the MultiConnectionBaseImpl. diff --git a/source/common/quic/envoy_quic_client_connection.cc b/source/common/quic/envoy_quic_client_connection.cc index 1e761823b8fb..c3d335daf08e 100644 --- a/source/common/quic/envoy_quic_client_connection.cc +++ b/source/common/quic/envoy_quic_client_connection.cc @@ -230,7 +230,7 @@ void EnvoyQuicClientConnection::onPathValidationFailure( // Note that the probing socket and probing writer will be deleted once context goes out of // scope. OnPathValidationFailureAtClient(/*is_multi_port=*/false, *context); - std::unique_ptr probing_socket = + auto probing_socket = static_cast(context.get())->releaseSocket(); // Extend the socket life time till the end of the current event loop. dispatcher_.deferredDelete(std::make_unique(std::move(probing_socket))); @@ -287,8 +287,7 @@ void EnvoyQuicClientConnection::setNumPtosForPortMigration(uint32_t num_ptos_for EnvoyQuicClientConnection::EnvoyQuicPathValidationContext::EnvoyQuicPathValidationContext( const quic::QuicSocketAddress& self_address, const quic::QuicSocketAddress& peer_address, - std::unique_ptr writer, - std::unique_ptr probing_socket) + std::unique_ptr writer, Network::ConnectionSocketPtr probing_socket) : QuicPathValidationContext(self_address, peer_address), writer_(std::move(writer)), socket_(std::move(probing_socket)) {} @@ -303,7 +302,7 @@ EnvoyQuicPacketWriter* EnvoyQuicClientConnection::EnvoyQuicPathValidationContext return writer_.release(); } -std::unique_ptr +Network::ConnectionSocketPtr EnvoyQuicClientConnection::EnvoyQuicPathValidationContext::releaseSocket() { return std::move(socket_); } diff --git a/source/common/quic/envoy_quic_client_connection.h b/source/common/quic/envoy_quic_client_connection.h index c66af480535a..98c5a5c68526 100644 --- a/source/common/quic/envoy_quic_client_connection.h +++ b/source/common/quic/envoy_quic_client_connection.h @@ -33,7 +33,7 @@ class EnvoyQuicClientConnection : public quic::QuicConnection, EnvoyQuicPathValidationContext(const quic::QuicSocketAddress& self_address, const quic::QuicSocketAddress& peer_address, std::unique_ptr writer, - std::unique_ptr probing_socket); + Network::ConnectionSocketPtr probing_socket); ~EnvoyQuicPathValidationContext() override; @@ -43,7 +43,7 @@ class EnvoyQuicClientConnection : public quic::QuicConnection, Network::ConnectionSocket& probingSocket(); - std::unique_ptr releaseSocket(); + Network::ConnectionSocketPtr releaseSocket(); private: std::unique_ptr writer_; diff --git a/source/common/quic/quic_filter_manager_connection_impl.h b/source/common/quic/quic_filter_manager_connection_impl.h index da591a31b86b..a227f59fcfb5 100644 --- a/source/common/quic/quic_filter_manager_connection_impl.h +++ b/source/common/quic/quic_filter_manager_connection_impl.h @@ -142,6 +142,9 @@ class QuicFilterManagerConnectionImpl : public Network::ConnectionImplBase, void configureInitialCongestionWindow(uint64_t bandwidth_bits_per_sec, std::chrono::microseconds rtt) override; absl::optional congestionWindowInBytes() const override; + const Network::ConnectionSocketPtr& getSocket() const override { PANIC("not implemented"); } + void setConnectionReused(bool) override { PANIC("not implemented"); } + bool isConnectionReused() override { PANIC("not implemented"); } // Network::FilterManagerConnection void rawWrite(Buffer::Instance& data, bool end_stream) override; diff --git a/source/common/tcp_proxy/tcp_proxy.h b/source/common/tcp_proxy/tcp_proxy.h index 5dee1944706b..949bddc86119 100644 --- a/source/common/tcp_proxy/tcp_proxy.h +++ b/source/common/tcp_proxy/tcp_proxy.h @@ -570,6 +570,10 @@ class Filter : public Network::ReadFilter, os << spaces << "TcpProxy " << this << DUMP_MEMBER(streamId()) << "\n"; DUMP_DETAILS(parent_->getStreamInfo().upstreamInfo()); } + + void setReverseConnForceLocalReply(bool value) override { + ENVOY_LOG_MISC(error, "Cannot set value {}.", value); + } Filter* parent_{}; Http::RequestTrailerMapPtr request_trailer_map_; std::shared_ptr route_; diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index ae66348835f3..e8497ce4a5da 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -2425,6 +2425,17 @@ Http::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateConnPool( } if (protocols.size() == 1 && protocols[0] == Http::Protocol::Http2 && context_.runtime().snapshot().featureEnabled("upstream.use_http2", 100)) { + if (host->cluster().clusterType().has_value() && host->cluster().clusterType()->name() == "envoy.clusters.reverse_connection") { + ENVOY_LOG_MISC(debug, "Allocating reverse connection conn pool"); + auto* factory = Config::Utility::getFactoryByName( + "envoy.http.reverse_conn.default"); + if (factory) { + return factory->allocateConnPool( + dispatcher, context_.api().randomGenerator(), server_.singletonManager(), host, + priority, options, transport_socket_options, state, origin, alternate_protocols_cache); + } + else throw EnvoyException("Failed to create reverse connection conn pool. Cannot find a factory implementation for reverse connection conn pool."); + } return Http::Http2::allocateConnPool(dispatcher, context_.api().randomGenerator(), host, priority, options, transport_socket_options, state, origin, alternate_protocols_cache); diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index c713f8d3caaa..5ce6f50e8f80 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -226,6 +226,9 @@ class HostDescriptionImplBase : virtual public HostDescription, const envoy::config::core::v3::Metadata* metadata) const override; absl::optional lastHcPassTime() const override { return last_hc_pass_time_; } + const absl::string_view getHostId() const override { return host_id_; } + void setHostId(const std::string host_id) override { host_id_ = host_id; } + void setHealthChecker(HealthCheckHostMonitorPtr&& health_checker) override { health_checker_ = std::move(health_checker); } @@ -280,6 +283,8 @@ class HostDescriptionImplBase : virtual public HostDescription, socket_factory_ ABSL_GUARDED_BY(metadata_mutex_); const MonotonicTime creation_time_; absl::optional last_hc_pass_time_; + // This field is needed to fetch socket from rc_handler for reverse connection. + std::string host_id_; HostLbPolicyDataPtr lb_policy_data_; }; diff --git a/source/extensions/clusters/common/logical_host.h b/source/extensions/clusters/common/logical_host.h index 3e4734f69c79..675cbdb1fabc 100644 --- a/source/extensions/clusters/common/logical_host.h +++ b/source/extensions/clusters/common/logical_host.h @@ -124,6 +124,13 @@ class RealHostDescription : public HostDescription { absl::optional lastHcPassTime() const override { return logical_host_->lastHcPassTime(); } + + const absl::string_view getHostId() const override { return logical_host_->getHostId(); } + void setHostId(const std::string host_id) override { + ENVOY_LOG_MISC(error, "Cannot set host id to {} for const RealHostDescription.", host_id); + PANIC("not implemented"); + } + uint32_t priority() const override { return logical_host_->priority(); } Network::UpstreamTransportSocketFactory& resolveTransportSocketFactory(const Network::Address::InstanceConstSharedPtr& dest_address, diff --git a/source/server/BUILD b/source/server/BUILD index f37fa4146c0d..3301d8ce5f44 100644 --- a/source/server/BUILD +++ b/source/server/BUILD @@ -450,6 +450,7 @@ envoy_cc_library( "//source/common/init:manager_lib", "//source/common/local_info:local_info_lib", "//source/common/memory:stats_lib", + "//source/common/network:connection_lib", "//source/common/protobuf:utility_lib", "//source/common/quic:quic_stat_names_lib", "//source/common/runtime:runtime_keys_lib", diff --git a/source/server/admin/admin.h b/source/server/admin/admin.h index 1a98fe1be166..aa3f1e752a23 100644 --- a/source/server/admin/admin.h +++ b/source/server/admin/admin.h @@ -379,6 +379,11 @@ class AdminImpl : public Admin, return parent_.socket_factories_; } bool bindToPort() const override { return true; } + Network::ReverseConnectionListenerConfigOptRef + reverseConnectionListenerConfig() const override { + ENVOY_LOG(info, "Reverse connection config is not supported for Admin listener."); + return Network::ReverseConnectionListenerConfigOptRef(); + } bool handOffRestoredDestinationConnections() const override { return false; } uint32_t perConnectionBufferLimitBytes() const override { return 0; } std::chrono::milliseconds listenerFiltersTimeout() const override { return {}; } @@ -386,6 +391,9 @@ class AdminImpl : public Admin, Stats::Scope& listenerScope() override { return scope_; } uint64_t listenerTag() const override { return 0; } const std::string& name() const override { return name_; } + const std::string& versionInfo() const override { + PANIC("Not implemented for Admin listener."); + } const Network::ListenerInfoConstSharedPtr& listenerInfo() const override { return parent_.listener_info_; } diff --git a/source/server/api_listener_impl.h b/source/server/api_listener_impl.h index 9d04101457d2..b096be08289d 100644 --- a/source/server/api_listener_impl.h +++ b/source/server/api_listener_impl.h @@ -114,6 +114,9 @@ class ApiListenerImplBase : public ApiListener, void removeConnectionCallbacks(Network::ConnectionCallbacks& cb) override { callbacks_.remove(&cb); } + const Network::ConnectionSocketPtr& getSocket() const override { PANIC("not implemented"); } + void setConnectionReused(bool) override { PANIC("not implemented"); } + bool isConnectionReused() override { PANIC("not implemented"); } void addBytesSentCallback(Network::Connection::BytesSentCb) override { IS_ENVOY_BUG("Unexpected function call"); } diff --git a/source/server/server.cc b/source/server/server.cc index 45baa06e9d39..cbb689a2726b 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -636,6 +636,7 @@ absl::Status InstanceBase::initializeOrThrow(Network::Address::InstanceConstShar auto config = Config::Utility::translateAnyToFactoryConfig( bootstrap_extension.typed_config(), messageValidationContext().staticValidationVisitor(), factory); + ENVOY_LOG(trace, "creating bootstrap extension from factory: {}", factory.name()); bootstrap_extensions_.push_back( factory.createBootstrapExtension(*config, serverFactoryContext())); } diff --git a/test/common/json/json_loader_test.cc b/test/common/json/json_loader_test.cc index 99e44aedd15c..aaf004c58c28 100644 --- a/test/common/json/json_loader_test.cc +++ b/test/common/json/json_loader_test.cc @@ -534,6 +534,26 @@ TEST_F(JsonLoaderTest, InvalidJsonToMsgpack) { EXPECT_EQ(0, Factory::jsonToMsgpack("{\"hello\":\"world\"").size()); } +TEST_F(JsonLoaderTest, EmptyListAsJsonString) { + std::list list{}; + std::string json_string = Factory::listAsJsonString(list); + EXPECT_EQ(json_string, "[]"); +} + +TEST_F(JsonLoaderTest, ValidListAsJsonString) { + std::list list{"item1", "item2", "item3"}; + std::string json_string = Factory::listAsJsonString(list); + EXPECT_EQ(json_string, R"(["item1","item2","item3"])"); +} + +TEST_F(JsonLoaderTest, NestedListAsJsonString) { + std::list list{"item1", "item2", "item3"}; + std::list nested_list{"nested_item1", "nested_item2"}; + list.push_back(Factory::listAsJsonString(nested_list)); + std::string json_string = Factory::listAsJsonString(list); + EXPECT_EQ(json_string, R"(["item1","item2","item3","[\"nested_item1\",\"nested_item2\"]"])"); +} + } // namespace } // namespace Json } // namespace Envoy diff --git a/test/common/listener_manager/listener_manager_impl_test.cc b/test/common/listener_manager/listener_manager_impl_test.cc index cd7a7f658fe6..7f64ca7b58cf 100644 --- a/test/common/listener_manager/listener_manager_impl_test.cc +++ b/test/common/listener_manager/listener_manager_impl_test.cc @@ -53,6 +53,7 @@ using testing::Throw; // For internal listener test only. SINGLETON_MANAGER_REGISTRATION(internal_listener_registry); +SINGLETON_MANAGER_REGISTRATION(reverse_conn_registry); class ListenerManagerImplWithDispatcherStatsTest : public ListenerManagerImplTest { protected: @@ -66,6 +67,9 @@ class ListenerManagerImplWithRealFiltersTest : public ListenerManagerImplTest { ASSERT_NE(nullptr, server_.singletonManager().getTyped( "internal_listener_registry_singleton", [registry = internal_registry_]() { return registry; })); + ASSERT_NE(nullptr, server_.singletonManager().getTyped( + "reverse_conn_registry_singleton", + [registry = rev_conn_registry_]() { return registry; })); } /** @@ -756,6 +760,76 @@ bind_to_port: false EXPECT_EQ(1UL, server_.stats_store_.counterFromString("listener.127.0.0.1_1234.foo").value()); } +TEST_P(ListenerManagerImplWithRealFiltersTest, RejectReverseConnListenerWithNullRegistry) { + // When the reverse conn registry is not set, the reverse conn listener + // config will not be built. + const std::string yaml = R"EOF( + address: + socket_address: + address: 127.0.0.1 + port_value: 1234 + reverse_connection_listener_config: {} + filter_chains: + - filters: + name: foo + )EOF"; + + // Simulate reverse_conn_registry being nullptr + rev_conn_registry_ = nullptr; + + EXPECT_NO_THROW(addOrUpdateListener(parseListenerFromV3Yaml(yaml))); + + const auto& listeners = manager_->listeners(); + ASSERT_EQ(listeners.size(), 1); + EXPECT_FALSE(listeners[0].get().reverseConnectionListenerConfig().has_value()); +} + +TEST_P(ListenerManagerImplWithRealFiltersTest, RejectReverseConnListenerWithInvalidConfig) { + // Reverse conn listener is not built when the listener's reverse conn listener + // config has missing fields. + const std::string yaml = R"EOF( + address: + socket_address: + address: 127.0.0.1 + port_value: 1234 + reverse_connection_listener_config: {} + filter_chains: + - filters: + name: foo + )EOF"; + + // Set up the expectation that fromAnyConfig will return an InvalidArgumentError + EXPECT_CALL(*rev_conn_registry_, fromAnyConfig(_)) + .WillOnce(Return(ByMove(absl::InvalidArgumentError("Source node ID is missing in reverse connection listener config")))); + + EXPECT_THROW_WITH_REGEX( + addOrUpdateListener(parseListenerFromV3Yaml(yaml)), EnvoyException, + "failed to unpack reverse connection config"); +} + +TEST_P(ListenerManagerImplWithRealFiltersTest, AcceptReverseConnListenerWithValidConfig) { + // Reverse conn listener built successfully with a valid config. + const std::string yaml = R"EOF( + address: + socket_address: + address: 127.0.0.1 + port_value: 1234 + reverse_connection_listener_config: {} + filter_chains: + - filters: + name: foo + )EOF"; + + // Set up the expectation that fromAnyConfig will return a valid configuration + EXPECT_CALL(*rev_conn_registry_, fromAnyConfig(_)) + .WillOnce(Return(ByMove(std::move(reverse_connection_listener_config_)))); + EXPECT_NO_THROW(addOrUpdateListener(parseListenerFromV3Yaml(yaml))); + + const auto& listeners = manager_->listeners(); + ASSERT_EQ(listeners.size(), 1); + EXPECT_TRUE(listeners[0].get().reverseConnectionListenerConfig().has_value()); +} + TEST_P(ListenerManagerImplTest, MultipleSocketTypeSpecifiedInAddresses) { const std::string yaml = R"EOF( name: "foo" diff --git a/test/common/listener_manager/listener_manager_impl_test.h b/test/common/listener_manager/listener_manager_impl_test.h index 493534c6dd2a..51515bc636bc 100644 --- a/test/common/listener_manager/listener_manager_impl_test.h +++ b/test/common/listener_manager/listener_manager_impl_test.h @@ -469,6 +469,24 @@ class ListenerManagerImplTest : public testing::TestWithParam { std::shared_ptr internal_registry_{ std::make_shared()}; + class DumbRevConnRegistry : public Singleton::Instance, + public Network::RevConnRegistry { + public: + MOCK_METHOD(Network::LocalRevConnRegistry*, getLocalRegistry, ()); + MOCK_METHOD(absl::StatusOr, fromAnyConfig, + (const google::protobuf::Any& config)); + }; + + class DummyReverseConnectionListenerConfig : public Network::ReverseConnectionListenerConfig { + public: + MOCK_METHOD(Network::ReverseConnParamsPtr&, getReverseConnParams, ()); + MOCK_METHOD(Network::RevConnRegistry&, reverseConnRegistry, ()); + }; + + std::shared_ptr rev_conn_registry_{ + std::make_shared()}; + std::unique_ptr reverse_connection_listener_config_{ + std::make_unique()}; std::unique_ptr> listener_factory_ptr_; NiceMock& listener_factory_; NiceMock validation_visitor; diff --git a/test/extensions/filters/listener/common/fuzz/listener_filter_fuzzer.h b/test/extensions/filters/listener/common/fuzz/listener_filter_fuzzer.h index 5e4a666ad8ec..e986f5e3bdc0 100644 --- a/test/extensions/filters/listener/common/fuzz/listener_filter_fuzzer.h +++ b/test/extensions/filters/listener/common/fuzz/listener_filter_fuzzer.h @@ -53,6 +53,11 @@ class ListenerFilterWithDataFuzzer : public Network::ListenerConfig, return socket_factories_; } bool bindToPort() const override { return true; } + Network::ReverseConnectionListenerConfigOptRef + reverseConnectionListenerConfig() const override { + ENVOY_LOG(info, "Reverse connection config is not supported for ListenerFilterWithDataFuzzer."); + return Network::ReverseConnectionListenerConfigOptRef(); + } bool handOffRestoredDestinationConnections() const override { return false; } uint32_t perConnectionBufferLimitBytes() const override { return 0; } std::chrono::milliseconds listenerFiltersTimeout() const override { return {}; } @@ -61,6 +66,9 @@ class ListenerFilterWithDataFuzzer : public Network::ListenerConfig, uint64_t listenerTag() const override { return 1; } ResourceLimit& openConnections() override { return open_connections_; } const std::string& name() const override { return name_; } + const std::string& versionInfo() const override { + PANIC("Not implemented for ListenerFilterWithDataFuzzer."); + } Network::UdpListenerConfigOptRef udpListenerConfig() override { return {}; } Network::InternalListenerConfigOptRef internalListenerConfig() override { return {}; } const Network::ListenerInfoConstSharedPtr& listenerInfo() const override { diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index 34cbcf0fb022..2cdedbc1f0a2 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -340,6 +340,7 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks, MOCK_METHOD(absl::optional, upstreamOverrideHost, (), (const)); MOCK_METHOD(bool, shouldLoadShed, (), (const)); + MOCK_METHOD(void, setReverseConnForceLocalReply, (bool)); Buffer::InstancePtr buffer_; std::list callbacks_{}; diff --git a/test/mocks/network/connection.h b/test/mocks/network/connection.h index ba776f5f35c9..9847fe3f11e4 100644 --- a/test/mocks/network/connection.h +++ b/test/mocks/network/connection.h @@ -84,6 +84,9 @@ class MockConnectionBase { MOCK_METHOD(void, setBufferLimits, (uint32_t limit)); \ MOCK_METHOD(uint32_t, bufferLimit, (), (const)); \ MOCK_METHOD(bool, aboveHighWatermark, (), (const)); \ + MOCK_METHOD(const ConnectionSocketPtr&, getSocket, (), (const)); \ + MOCK_METHOD(void, setConnectionReused, (bool value)); \ + MOCK_METHOD(bool, isConnectionReused, ()); \ MOCK_METHOD(const Network::ConnectionSocket::OptionsSharedPtr&, socketOptions, (), (const)); \ MOCK_METHOD(StreamInfo::StreamInfo&, streamInfo, ()); \ MOCK_METHOD(const StreamInfo::StreamInfo&, streamInfo, (), (const)); \ diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index 9366bfc351b9..00b35e0a0366 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -212,7 +212,7 @@ class MockListenerFilter : public ListenerFilter { MOCK_METHOD(void, destroy_, ()); MOCK_METHOD(Network::FilterStatus, onAccept, (ListenerFilterCallbacks&)); MOCK_METHOD(Network::FilterStatus, onData, (Network::ListenerFilterBuffer&)); - + MOCK_METHOD(void, onClose, ()); size_t listener_filter_max_read_bytes_{0}; }; @@ -499,6 +499,8 @@ class MockListenerConfig : public ListenerConfig { MOCK_METHOD(const std::string&, name, (), (const)); MOCK_METHOD(Network::UdpListenerConfigOptRef, udpListenerConfig, ()); MOCK_METHOD(InternalListenerConfigOptRef, internalListenerConfig, ()); + MOCK_METHOD(ReverseConnectionListenerConfigOptRef, reverseConnectionListenerConfig, (), (const)); + MOCK_METHOD(const std::string&, versionInfo, (), (const)); MOCK_METHOD(ConnectionBalancer&, connectionBalancer, (const Network::Address::Instance&)); MOCK_METHOD(ResourceLimit&, openConnections, ()); MOCK_METHOD(uint32_t, tcpBacklogSize, (), (const)); @@ -522,6 +524,34 @@ class MockListenerConfig : public ListenerConfig { const AccessLog::InstanceSharedPtrVector empty_access_logs_; }; +class MockReverseConnectionListener : public ReverseConnectionListener { +public: + MOCK_METHOD(void, startRCWorkflow, (Event::Dispatcher& dispatcher, + Network::ConnectionHandler& conn_handler, + Network::ListenerConfig& config), ()); + MOCK_METHOD(void, onAccept, (ConnectionSocketPtr&& socket), ()); +}; + +class MockLocalRevConnRegistry : public LocalRevConnRegistry { +public: + MOCK_METHOD(Network::ReverseConnectionListenerPtr, createActiveReverseConnectionListener, + (Network::ConnectionHandler& conn_handler, Event::Dispatcher& dispatcher, + Network::ListenerConfig& config), ()); +}; + +class MockRevConnRegistry : public RevConnRegistry { +public: + MOCK_METHOD(LocalRevConnRegistry*, getLocalRegistry, ()); + MOCK_METHOD(absl::StatusOr, fromAnyConfig, + (const google::protobuf::Any& config), ()); +}; + +class MockReverseConnectionListenerConfig : public ReverseConnectionListenerConfig { +public: + MOCK_METHOD(ReverseConnParamsPtr&, getReverseConnParams, ()); + MOCK_METHOD(RevConnRegistry&, reverseConnRegistry, ()); +}; + class MockListener : public Listener { public: MockListener(); @@ -557,6 +587,9 @@ class MockConnectionHandler : public virtual ConnectionHandler { MOCK_METHOD(void, enableListeners, ()); MOCK_METHOD(void, setListenerRejectFraction, (UnitFloat), (override)); MOCK_METHOD(const std::string&, statPrefix, (), (const)); + MOCK_METHOD(Network::LocalRevConnRegistry&, reverseConnRegistry, (), (const)); + MOCK_METHOD(void, saveUpstreamConnection, + (Network::ConnectionSocketPtr&& upstream_socket, uint64_t listener_tag)); uint64_t num_handler_connections_{}; }; diff --git a/test/mocks/upstream/host.h b/test/mocks/upstream/host.h index 17b2fb085bf9..e2f2aa138cb0 100644 --- a/test/mocks/upstream/host.h +++ b/test/mocks/upstream/host.h @@ -106,6 +106,8 @@ class MockHostDescription : public HostDescription { MOCK_METHOD(void, priority, (uint32_t)); MOCK_METHOD(absl::optional, lastHcPassTime, (), (const)); MOCK_METHOD(void, setLastHcPassTime, (MonotonicTime last_hc_pass_time)); + MOCK_METHOD(const absl::string_view, getHostId, (), (const)); + MOCK_METHOD(void, setHostId, (const std::string host_id)); Stats::StatName localityZoneStatName() const override { locality_zone_stat_name_ = std::make_unique(locality().zone(), *symbol_table_); @@ -183,6 +185,8 @@ class MockHostLight : public Host { (), (const)); MOCK_METHOD(HealthCheckHostMonitor&, healthChecker, (), (const)); MOCK_METHOD(void, healthFlagClear, (HealthFlag flag)); + MOCK_METHOD(const absl::string_view, getHostId, (), (const)); + MOCK_METHOD(void, setHostId, (const std::string host_id)); MOCK_METHOD(bool, healthFlagGet, (HealthFlag flag), (const)); MOCK_METHOD(void, healthFlagSet, (HealthFlag flag)); MOCK_METHOD(uint32_t, healthFlagsGetAll, (), (const));