Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Envoy core changes for reverse connections #37368

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions envoy/event/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
#include "absl/functional/any_invocable.h"

namespace Envoy {
namespace Upstream {
class ClusterManager;
}
namespace Event {

/**
Expand Down Expand Up @@ -286,6 +289,28 @@ class Dispatcher : public DispatcherBase, public ScopeTracker {
* Shutdown the dispatcher by clear dispatcher thread deletable.
*/
virtual void shutdown() PURE;

/**
* Provides filters access to connection handler to save outgoing connections as
* incoming connections for reverse tunnels
*/
virtual void setConnectionHandler(Network::ConnectionHandler* connection_handler) PURE;

/**
* @return the Connection Handler.
*/
virtual Network::ConnectionHandler* connectionHandler() PURE;

/**
* Sets the dispatcher's cluster manager pointer.
* @param cluster_manager the upstream cluster manager object.
*/
virtual void setClusterManager(Upstream::ClusterManager* cluster_manager) PURE;

/**
* @return the cluster manager pointer.
*/
virtual Upstream::ClusterManager* getClusterManager() PURE;
};

using DispatcherPtr = std::unique_ptr<Dispatcher>;
Expand Down
20 changes: 20 additions & 0 deletions envoy/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ envoy_cc_library(
":connection_balancer_interface",
":listen_socket_interface",
":listener_interface",
":reverse_connection_handler_interface",
":reverse_connection_manager_interface",
"//envoy/common:random_generator_interface",
"//envoy/runtime:runtime_interface",
"//envoy/ssl:context_interface",
Expand Down Expand Up @@ -257,3 +259,21 @@ envoy_cc_library(
":address_interface",
],
)

envoy_cc_library(
name = "reverse_connection_manager_interface",
hdrs = ["reverse_connection_manager.h"],
deps = [
"//envoy/network:connection_interface",
"//envoy/network:listener_interface",
],
)

envoy_cc_library(
name = "reverse_connection_handler_interface",
hdrs = ["reverse_connection_handler.h"],
deps = [
":listen_socket_interface",
"//envoy/event:timer_interface",
],
)
21 changes: 21 additions & 0 deletions envoy/network/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,27 @@ class Connection : public Event::DeferredDeletable,
*/
virtual bool aboveHighWatermark() const PURE;

/**
* @return ConnectionSocketPtr& To get socket from current connection.
*/
virtual const ConnectionSocketPtr& getSocket() const PURE;

/**
* Set the flag connection_reused_ to value. The flag connection_reused_
* indicates whether the client connection is reused.
*/
virtual void setConnectionReused(bool value) PURE;

/**
* Set flag to convey active connection (listener) is reused.
*/
virtual void setActiveConnectionReused(bool value) PURE;

/**
* return boolean telling if active connection (listener) is reused.
*/
virtual bool isActiveConnectionReused() PURE;

/**
* Get the socket options set on this connection.
*/
Expand Down
42 changes: 42 additions & 0 deletions envoy/network/connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "envoy/network/filter.h"
#include "envoy/network/listen_socket.h"
#include "envoy/network/listener.h"
#include "envoy/network/reverse_connection_handler.h"
#include "envoy/network/reverse_connection_manager.h"
#include "envoy/runtime/runtime.h"
#include "envoy/server/overload/thread_local_overload_state.h"
#include "envoy/ssl/context.h"
Expand All @@ -19,6 +21,26 @@
namespace Envoy {
namespace Network {

// The thread local registry.
class LocalRevConnRegistry {
public:
virtual ~LocalRevConnRegistry() = default;

virtual Network::ReverseConnectionManager& getRCManager() PURE;
virtual Network::ReverseConnectionHandler& getRCHandler() PURE;
};

// The central reverse conn registry interface providing the thread local accessor.
class RevConnRegistry {
public:
virtual ~RevConnRegistry() = default;

/**
* @return The thread local registry.
*/
virtual LocalRevConnRegistry* getLocalRegistry() PURE;
};

// This interface allows for a listener to perform an alternative behavior when a
// packet can't be routed correctly during draining; for example QUIC packets that
// are not for an existing connection.
Expand Down Expand Up @@ -127,6 +149,26 @@ class ConnectionHandler {
*/
virtual const std::string& statPrefix() const PURE;

/**
* Pass the reverse connection socket to the listener that initiated it.
* @param upstream_socket the socket to be passed.
* @param listener_tag the tag of the listener that initiated the reverse
* connection.
*/
virtual void saveUpstreamConnection(Network::ConnectionSocketPtr&& upstream_socket,
uint64_t listener_tag) PURE;

/**
* Enable reverse connection entities on the current worker.
* @param reverse_conn_registry the thread local registry that holds the reverse connection
* entities.
*/
virtual void enableReverseConnections(Network::RevConnRegistry& reverse_conn_registry) PURE;
/**
* @return the thread local registry.
*/
virtual Network::LocalRevConnRegistry& reverseConnRegistry() const PURE;

/**
* Used by ConnectionHandler to manage listeners.
*/
Expand Down
6 changes: 6 additions & 0 deletions envoy/network/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,12 @@ class ListenerFilter {
*/
virtual FilterStatus onData(Network::ListenerFilterBuffer& buffer) PURE;

/**
* Called when the connection is closed. Only the current filter that has stopped filter
* chain iteration will get the callback.
*/
virtual void onClose(){};

/**
* Return the size of data the filter want to inspect from the connection.
* The size can be increased after filter need to inspect more data.
Expand Down
2 changes: 1 addition & 1 deletion envoy/network/listen_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class ConnectionSocket : public virtual Socket {
virtual void dumpState(std::ostream& os, int indent_level = 0) const PURE;
};

using ConnectionSocketPtr = std::unique_ptr<ConnectionSocket>;
using ConnectionSocketPtr = std::shared_ptr<ConnectionSocket>;

} // namespace Network
} // namespace Envoy
39 changes: 39 additions & 0 deletions envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,34 @@ class InternalListenerConfig {

using InternalListenerConfigOptRef = OptRef<InternalListenerConfig>;

class RevConnRegistry;

/**
* Configuration for a reverse connection listener.
*/
class ReverseConnectionListenerConfig {
public:
virtual ~ReverseConnectionListenerConfig() = default;

// The source node, cluster and tenant IDs of the local envoy. This is used when the listener is
// used to create reverse connections.
struct ReverseConnParams {
std::string src_node_id_;
std::string src_cluster_id_;
std::string src_tenant_id_;
absl::flat_hash_map<std::string, uint32_t> remote_cluster_to_conn_count_map_;
};
using ReverseConnParamsPtr = std::unique_ptr<ReverseConnParams>;
/**
* @return the private ReverseConnParams object, containing
* the params identifying the local envoy.
*/
virtual ReverseConnParamsPtr& getReverseConnParams() PURE;
};

using ReverseConnectionListenerConfigPtr = std::unique_ptr<ReverseConnectionListenerConfig>;
using ReverseConnectionListenerConfigOptRef = OptRef<ReverseConnectionListenerConfig>;

/**
* Description of the listener.
*/
Expand Down Expand Up @@ -260,6 +288,17 @@ class ListenerConfig {
*/
virtual InternalListenerConfigOptRef internalListenerConfig() PURE;

/**
* @return the reverse connection configuration for the listener IFF it is an reverse connection
* listener.
*/
virtual ReverseConnectionListenerConfigOptRef reverseConnectionListenerConfig() const PURE;

/**
* @return the listener's version.
*/
virtual const std::string& versionInfo() const PURE;

/**
* @param address is used for query the address specific connection balancer.
* @return the connection balancer for this listener. All listeners have a connection balancer,
Expand Down
131 changes: 131 additions & 0 deletions envoy/network/reverse_connection_handler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#pragma once

#include <future>
#include <string>

#include "envoy/common/pure.h"
#include "envoy/event/timer.h"
#include "envoy/network/listen_socket.h"
#include "envoy/stats/scope.h"
#include "envoy/stats/stats_macros.h"

#include "absl/container/flat_hash_map.h"

namespace Envoy {
namespace Network {

/**
* All ReverseConnectionHandler stats. @see stats_macros.h
* This encompasses the stats for all accepted reverse connections by the responder envoy.
* The initiated reverse connections by the initiator envoy are logged by the RCManager.
*/
#define ALL_RCHANDLER_STATS(GAUGE) \
GAUGE(reverse_conn_cx_idle, NeverImport) \
GAUGE(reverse_conn_cx_used, NeverImport) \
GAUGE(reverse_conn_cx_total, NeverImport)

/**
* Struct definition for all ReverseConnectionHandler stats. @see stats_macros.h
*/
struct RCHandlerStats {
ALL_RCHANDLER_STATS(GENERATE_GAUGE_STRUCT)
};

using RCHandlerStatsPtr = std::unique_ptr<RCHandlerStats>;
using RCSocketPair = std::pair<Network::ConnectionSocketPtr, bool>;

/**
* class to store reverse connection sockets.
*/
class ReverseConnectionHandler {
public:
virtual ~ReverseConnectionHandler() = default;

/** Add the accepted connection and remote cluster mapping to RCHandler maps.
* @param node_id node_id of initiating node.
* @param cluster_id cluster_id of receiving(acceptor) cluster.
* @param socket the socket to be added.
* @param expects_proxy_protocol whether the proxy protocol header is expected. This is used
* in legacy versions.
* @param ping_interval the interval at which ping keepalives are sent on accepted reverse conns.
* @param rebalanced is true if we are adding to the socket after `pickMinHandler` is used
* to pick the most appropriate thread.
*/
virtual void
addConnectionSocket(const std::string& node_id, const std::string& cluster_id,
Network::ConnectionSocketPtr socket, bool expects_proxy_protocol,
const std::chrono::seconds& ping_interval = std::chrono::seconds::zero(),
bool rebalanced = false) PURE;

/** Add the accepted connection and remote cluster mapping to RCHandler maps
* through the thread local dispatcher.
* @param node_id node_id of initiating node.
* @param cluster_id cluster_id of receiving(acceptor) cluster.
* @param socket the socket to be added.
* @param expects_proxy_protocol whether the proxy protocol header is expected. This is used
* in legacy versions.
* @param ping_interval the interval at which ping keepalives are sent on accepted reverse conns.
*/
virtual void post(const std::string& node_id, const std::string& cluster_id,
Network::ConnectionSocketPtr socket, bool expects_proxy_protocol,
const std::chrono::seconds& ping_interval = std::chrono::seconds::zero()) PURE;

/** Called by the responder envoy when a request is received, that could be sent through a reverse
* connection. This returns an accepted connection socket, if present.
* @param key the remote cluster ID/ node ID.
* @param rebalanced is true if we are calling the function after `pickTargetHandler` is used
* to pick the most appropriate thread.
*/
virtual std::pair<Network::ConnectionSocketPtr, bool> getConnectionSocket(const std::string& key,
bool rebalanced) PURE;

/** Called by the responder envoy when the local worker does not have any accepted reverse
* connections for the key, to rebalance the request to a different worker and return the
* connection socket.
* @param key the remote cluster ID/ node ID.
* @param rebalanced is true if we are calling the function after `pickTargetHandler` is used
* to pick the most appropriate thread.
* @param socket_promise the promise to be set with the connection socket.
*/
virtual void rebalanceGetConnectionSocket(
const std::string& key, bool rebalanced,
std::shared_ptr<std::promise<Network::RCSocketPair>> socket_promise) PURE;

/**
* @return the number of reverse connections across all workers
* for the given node id.
*/
virtual size_t getNumberOfSocketsByNode(const std::string& node_id) PURE;

/**
* @return the number of reverse connections across all workers for
* the given cluster id.
*/
virtual size_t getNumberOfSocketsByCluster(const std::string& cluster_id) PURE;

using SocketCountMap = absl::flat_hash_map<std::string, size_t>;
/**
*
* @return the cluster -> reverse conn count mapping.
*/
virtual SocketCountMap getSocketCountMap() PURE;
/**
* Mark the connection socket dead and remove it from internal maps.
* @param fd the FD for the socket to be marked dead.
* @param used is true, when the connection the fd belongs to has been used by a cluster.
*/
virtual void markSocketDead(int fd, bool used) PURE;

/**
* Sets the stats scope for logging stats for accepted reverse connections
* with the local envoy as responder.
* @param scope the base scope to be used.
* @return the parent scope for RCHandler stats.
*/
virtual void initializeStats(Stats::Scope& scope) PURE;

virtual absl::flat_hash_map<std::string, size_t> getConnectionStats() PURE;
};

} // namespace Network
} // namespace Envoy
Loading