Skip to content

Commit

Permalink
do not copy ConnectionStats until we have to
Browse files Browse the repository at this point in the history
  • Loading branch information
brycekahle committed Aug 1, 2024
1 parent 946ce87 commit ba92af0
Show file tree
Hide file tree
Showing 15 changed files with 164 additions and 158 deletions.
11 changes: 11 additions & 0 deletions pkg/network/event_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,17 @@ func (c ConnectionStats) IsExpired(now uint64, timeout uint64) bool {
return c.LastUpdateEpoch+timeout <= now
}

// IsEmpty returns whether the connection has any statistics
func (c ConnectionStats) IsEmpty() bool {
// TODO why does this not include TCPEstablished and TCPClosed?
return c.Monotonic.RecvBytes == 0 &&
c.Monotonic.RecvPackets == 0 &&
c.Monotonic.SentBytes == 0 &&
c.Monotonic.SentPackets == 0 &&
c.Monotonic.Retransmits == 0 &&
len(c.TCPFailures) == 0
}

// ByteKey returns a unique key for this connection represented as a byte slice
// It's as following:
//
Expand Down
8 changes: 4 additions & 4 deletions pkg/network/netlink/conntracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ type Conntracker interface {
Describe(descs chan<- *prometheus.Desc)
// Collect returns the current state of all metrics of the collector
Collect(metrics chan<- prometheus.Metric)
GetTranslationForConn(network.ConnectionStats) *network.IPTranslation
GetTranslationForConn(*network.ConnectionStats) *network.IPTranslation
// GetType returns a string describing whether the conntracker is "ebpf" or "netlink"
GetType() string
DeleteTranslation(network.ConnectionStats)
DeleteTranslation(*network.ConnectionStats)
DumpCachedTable(context.Context) (map[uint32][]DebugConntrackEntry, error)
Close()
}
Expand Down Expand Up @@ -166,7 +166,7 @@ func (ctr *realConntracker) GetType() string {
return "netlink"
}

func (ctr *realConntracker) GetTranslationForConn(c network.ConnectionStats) *network.IPTranslation {
func (ctr *realConntracker) GetTranslationForConn(c *network.ConnectionStats) *network.IPTranslation {
then := time.Now()
defer func() {
conntrackerTelemetry.getsDuration.Observe(float64(time.Since(then).Nanoseconds()))
Expand Down Expand Up @@ -202,7 +202,7 @@ func (ctr *realConntracker) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(conntrackerTelemetry.orphanSize, prometheus.CounterValue, float64(ctr.cache.orphans.Len()))
}

func (ctr *realConntracker) DeleteTranslation(c network.ConnectionStats) {
func (ctr *realConntracker) DeleteTranslation(c *network.ConnectionStats) {
then := time.Now()

ctr.Lock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/netlink/conntracker_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestConnTrackerCrossNamespaceAllNsDisabled(t *testing.T) {

time.Sleep(time.Second)
trans := ct.GetTranslationForConn(
network.ConnectionStats{
&network.ConnectionStats{
Source: util.AddressFromNetIP(laddr.IP),
SPort: uint16(laddr.Port),
Dest: util.AddressFromString("2.2.2.4"),
Expand Down
16 changes: 8 additions & 8 deletions pkg/network/netlink/conntracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestRegisterNonNat(t *testing.T) {

rt.register(c)
translation := rt.GetTranslationForConn(
network.ConnectionStats{
&network.ConnectionStats{
Source: util.AddressFromString("10.0.0.0"),
SPort: 8080,
Dest: util.AddressFromString("50.30.40.10"),
Expand All @@ -91,7 +91,7 @@ func TestRegisterNat(t *testing.T) {

rt.register(c)
translation := rt.GetTranslationForConn(
network.ConnectionStats{
&network.ConnectionStats{
Source: util.AddressFromString("10.0.0.0"),
SPort: 12345,
Dest: util.AddressFromString("50.30.40.10"),
Expand All @@ -108,7 +108,7 @@ func TestRegisterNat(t *testing.T) {
}, translation)

udpTranslation := rt.GetTranslationForConn(
network.ConnectionStats{
&network.ConnectionStats{
Source: util.AddressFromString("10.0.0.0"),
SPort: 12345,
Dest: util.AddressFromString("50.30.40.10"),
Expand All @@ -126,7 +126,7 @@ func TestRegisterNatUDP(t *testing.T) {

rt.register(c)
translation := rt.GetTranslationForConn(
network.ConnectionStats{
&network.ConnectionStats{
Source: util.AddressFromString("10.0.0.0"),
SPort: 12345,
Dest: util.AddressFromString("50.30.40.10"),
Expand All @@ -143,7 +143,7 @@ func TestRegisterNatUDP(t *testing.T) {
}, translation)

translation = rt.GetTranslationForConn(
network.ConnectionStats{
&network.ConnectionStats{
Source: util.AddressFromString("10.0.0.0"),
SPort: 12345,
Dest: util.AddressFromString("50.30.40.10"),
Expand All @@ -158,7 +158,7 @@ func TestTooManyEntries(t *testing.T) {
rt := newConntracker(2)

rt.register(makeTranslatedConn(netip.MustParseAddr("10.0.0.0"), netip.MustParseAddr("20.0.0.0"), netip.MustParseAddr("50.30.40.10"), 6, 12345, 80, 80))
tr := rt.GetTranslationForConn(network.ConnectionStats{
tr := rt.GetTranslationForConn(&network.ConnectionStats{
Source: util.AddressFromString("10.0.0.0"),
SPort: 12345,
Dest: util.AddressFromString("50.30.40.10"),
Expand All @@ -171,7 +171,7 @@ func TestTooManyEntries(t *testing.T) {

rt.register(makeTranslatedConn(netip.MustParseAddr("10.0.0.1"), netip.MustParseAddr("20.0.0.1"), netip.MustParseAddr("50.30.40.20"), 6, 12345, 80, 80))
// old entry should be gone
tr = rt.GetTranslationForConn(network.ConnectionStats{
tr = rt.GetTranslationForConn(&network.ConnectionStats{
Source: util.AddressFromString("10.0.0.0"),
SPort: 12345,
Dest: util.AddressFromString("50.30.40.10"),
Expand All @@ -181,7 +181,7 @@ func TestTooManyEntries(t *testing.T) {
require.Nil(t, tr)

// check new entry
tr = rt.GetTranslationForConn(network.ConnectionStats{
tr = rt.GetTranslationForConn(&network.ConnectionStats{
Source: util.AddressFromString("10.0.0.1"),
SPort: 12345,
Dest: util.AddressFromString("50.30.40.20"),
Expand Down
4 changes: 2 additions & 2 deletions pkg/network/netlink/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ func NewNoOpConntracker() Conntracker {
func (*noOpConntracker) GetType() string { return "" }

//nolint:revive // TODO(NET) Fix revive linter
func (*noOpConntracker) GetTranslationForConn(c network.ConnectionStats) *network.IPTranslation {
func (*noOpConntracker) GetTranslationForConn(c *network.ConnectionStats) *network.IPTranslation {
return nil
}

//nolint:revive // TODO(NET) Fix revive linter
func (*noOpConntracker) DeleteTranslation(c network.ConnectionStats) {
func (*noOpConntracker) DeleteTranslation(c *network.ConnectionStats) {

}

Expand Down
58 changes: 25 additions & 33 deletions pkg/network/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ type State interface {
// RemoveConnections removes the given keys from the state
RemoveConnections(conns []*ConnectionStats)

// StoreClosedConnections stores a batch of closed connections
StoreClosedConnections(connections []ConnectionStats)
// StoreClosedConnection stores a batch of closed connections
StoreClosedConnection(connection *ConnectionStats)

// GetStats returns a map of statistics about the current network state
GetStats() map[string]interface{}
Expand Down Expand Up @@ -166,32 +166,32 @@ type closedConnections struct {
// All empty connections are placed at the end. If it is not empty, it will be placed
// at the index of the first empty connection, and the first empty connection will be placed at the end.
// If there are no empty connections, it will be appended at the end.
func (cc *closedConnections) insert(c ConnectionStats, maxClosedConns uint32) {
func (cc *closedConnections) insert(c *ConnectionStats, maxClosedConns uint32) {
// If we have reached the limit, drop an empty connection
if uint32(len(cc.conns)) >= maxClosedConns {
stateTelemetry.closedConnDropped.IncWithTags(c.Type.Tags())
cc.dropEmpty(c)
return
}
// If the connection is empty append at the end
if isEmpty(c) {
cc.conns = append(cc.conns, c)
if c.IsEmpty() {
cc.conns = append(cc.conns, *c)
cc.byCookie[c.Cookie] = len(cc.conns) - 1
return
}

// Insert the connection before empty connections
if cc.emptyStart < len(cc.conns) {
emptyConn := cc.conns[cc.emptyStart]
cc.conns[cc.emptyStart] = c
cc.conns[cc.emptyStart] = *c
cc.conns = append(cc.conns, emptyConn)
cc.byCookie[c.Cookie] = cc.emptyStart
cc.byCookie[emptyConn.Cookie] = len(cc.conns) - 1
cc.emptyStart++
return
}
// If there are no empty connections, append at the end
cc.conns = append(cc.conns, c)
cc.conns = append(cc.conns, *c)
cc.byCookie[c.Cookie] = len(cc.conns) - 1
cc.emptyStart = len(cc.conns)
}
Expand All @@ -200,12 +200,12 @@ func (cc *closedConnections) insert(c ConnectionStats, maxClosedConns uint32) {
// This method drops the incoming connection if it's empty or there are no empty connections in conns.
// If neither of these conditions are true, it will drop the first empty connection and replace it with
// the incoming connection.
func (cc *closedConnections) dropEmpty(c ConnectionStats) {
if isEmpty(c) || cc.emptyStart == len(cc.conns) {
func (cc *closedConnections) dropEmpty(c *ConnectionStats) {
if c.IsEmpty() || cc.emptyStart == len(cc.conns) {
return
}
delete(cc.byCookie, cc.conns[cc.emptyStart].Cookie)
cc.conns[cc.emptyStart] = c
cc.conns[cc.emptyStart] = *c
cc.byCookie[c.Cookie] = cc.emptyStart
cc.emptyStart++
}
Expand All @@ -216,25 +216,25 @@ func (cc *closedConnections) dropEmpty(c ConnectionStats) {
// Otherwise it checks if the connection at i is empty and will be replaced with a non-empty conn.
// If this is true, it will replace the connection and move it to where the first empty conn is.
// If there isn't a change of state (both are empty or non-empty) it will simply replace the conn.
func (cc *closedConnections) replaceAt(i int, c ConnectionStats) {
func (cc *closedConnections) replaceAt(i int, c *ConnectionStats) {
// pick the latest one
if c.LastUpdateEpoch <= cc.conns[i].LastUpdateEpoch {
return
}
// If c is empty and connn[i] is not, do not replace
if isEmpty(c) && i < cc.emptyStart {
if c.IsEmpty() && i < cc.emptyStart {
return
}
// If conn[i] is empty and c is not, replace with the first empty connection
if !isEmpty(c) && i >= cc.emptyStart {
if !c.IsEmpty() && i >= cc.emptyStart {
cc.conns[cc.emptyStart], cc.conns[i] = cc.conns[i], cc.conns[cc.emptyStart]
cc.byCookie[cc.conns[i].Cookie] = i
cc.conns[cc.emptyStart] = c
cc.conns[cc.emptyStart] = *c
cc.byCookie[c.Cookie] = cc.emptyStart
cc.emptyStart++
return
}
cc.conns[i] = c
cc.conns[i] = *c
}

type client struct {
Expand Down Expand Up @@ -600,27 +600,25 @@ func (ns *networkState) mergeByCookie(conns []ConnectionStats) ([]ConnectionStat
return conns, connsByKey
}

// StoreClosedConnections wraps the unexported method while locking state
func (ns *networkState) StoreClosedConnections(closed []ConnectionStats) {
// StoreClosedConnection wraps the unexported method while locking state
func (ns *networkState) StoreClosedConnection(closed *ConnectionStats) {
ns.Lock()
defer ns.Unlock()

ns.storeClosedConnections(closed)
ns.storeClosedConnection(closed)
}

// storeClosedConnection stores the given connection for every client
func (ns *networkState) storeClosedConnections(conns []ConnectionStats) {
func (ns *networkState) storeClosedConnection(c *ConnectionStats) {
for _, client := range ns.clients {
for _, c := range conns {
if i, ok := client.closed.byCookie[c.Cookie]; ok {
if ns.mergeConnectionStats(&client.closed.conns[i], &c) {
stateTelemetry.statsCookieCollisions.Inc()
client.closed.replaceAt(i, c)
}
continue
if i, ok := client.closed.byCookie[c.Cookie]; ok {
if ns.mergeConnectionStats(&client.closed.conns[i], c) {
stateTelemetry.statsCookieCollisions.Inc()
client.closed.replaceAt(i, c)
}
client.closed.insert(c, ns.maxClosedConns)
continue
}
client.closed.insert(c, ns.maxClosedConns)
}
}

Expand Down Expand Up @@ -1496,9 +1494,3 @@ func (ns *networkState) mergeConnectionStats(a, b *ConnectionStats) (collision b

return false
}

func isEmpty(conn ConnectionStats) bool {
return conn.Monotonic.RecvBytes == 0 && conn.Monotonic.RecvPackets == 0 &&
conn.Monotonic.SentBytes == 0 && conn.Monotonic.SentPackets == 0 &&
conn.Monotonic.Retransmits == 0 && len(conn.TCPFailures) == 0
}
Loading

0 comments on commit ba92af0

Please sign in to comment.