diff --git a/pkg/network/event_common.go b/pkg/network/event_common.go index 8d25e14b53152..77e956f20c90e 100644 --- a/pkg/network/event_common.go +++ b/pkg/network/event_common.go @@ -302,6 +302,17 @@ func (c ConnectionStats) IsExpired(now uint64, timeout uint64) bool { return c.LastUpdateEpoch+timeout <= now } +// IsEmpty returns whether the connection has any statistics +func (c ConnectionStats) IsEmpty() bool { + // TODO why does this not include TCPEstablished and TCPClosed? + return c.Monotonic.RecvBytes == 0 && + c.Monotonic.RecvPackets == 0 && + c.Monotonic.SentBytes == 0 && + c.Monotonic.SentPackets == 0 && + c.Monotonic.Retransmits == 0 && + len(c.TCPFailures) == 0 +} + // ByteKey returns a unique key for this connection represented as a byte slice // It's as following: // diff --git a/pkg/network/netlink/conntracker.go b/pkg/network/netlink/conntracker.go index 8d7cc720e52d4..af4314b800cbd 100644 --- a/pkg/network/netlink/conntracker.go +++ b/pkg/network/netlink/conntracker.go @@ -45,10 +45,10 @@ type Conntracker interface { Describe(descs chan<- *prometheus.Desc) // Collect returns the current state of all metrics of the collector Collect(metrics chan<- prometheus.Metric) - GetTranslationForConn(network.ConnectionStats) *network.IPTranslation + GetTranslationForConn(*network.ConnectionStats) *network.IPTranslation // GetType returns a string describing whether the conntracker is "ebpf" or "netlink" GetType() string - DeleteTranslation(network.ConnectionStats) + DeleteTranslation(*network.ConnectionStats) DumpCachedTable(context.Context) (map[uint32][]DebugConntrackEntry, error) Close() } @@ -166,7 +166,7 @@ func (ctr *realConntracker) GetType() string { return "netlink" } -func (ctr *realConntracker) GetTranslationForConn(c network.ConnectionStats) *network.IPTranslation { +func (ctr *realConntracker) GetTranslationForConn(c *network.ConnectionStats) *network.IPTranslation { then := time.Now() defer func() { conntrackerTelemetry.getsDuration.Observe(float64(time.Since(then).Nanoseconds())) @@ -202,7 +202,7 @@ func (ctr *realConntracker) Collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric(conntrackerTelemetry.orphanSize, prometheus.CounterValue, float64(ctr.cache.orphans.Len())) } -func (ctr *realConntracker) DeleteTranslation(c network.ConnectionStats) { +func (ctr *realConntracker) DeleteTranslation(c *network.ConnectionStats) { then := time.Now() ctr.Lock() diff --git a/pkg/network/netlink/conntracker_integration_test.go b/pkg/network/netlink/conntracker_integration_test.go index 05c9293447717..2e92ad5fa21dc 100644 --- a/pkg/network/netlink/conntracker_integration_test.go +++ b/pkg/network/netlink/conntracker_integration_test.go @@ -59,7 +59,7 @@ func TestConnTrackerCrossNamespaceAllNsDisabled(t *testing.T) { time.Sleep(time.Second) trans := ct.GetTranslationForConn( - network.ConnectionStats{ + &network.ConnectionStats{ Source: util.AddressFromNetIP(laddr.IP), SPort: uint16(laddr.Port), Dest: util.AddressFromString("2.2.2.4"), diff --git a/pkg/network/netlink/conntracker_test.go b/pkg/network/netlink/conntracker_test.go index 0a88454ca1f34..c0dffccab8573 100644 --- a/pkg/network/netlink/conntracker_test.go +++ b/pkg/network/netlink/conntracker_test.go @@ -74,7 +74,7 @@ func TestRegisterNonNat(t *testing.T) { rt.register(c) translation := rt.GetTranslationForConn( - network.ConnectionStats{ + &network.ConnectionStats{ Source: util.AddressFromString("10.0.0.0"), SPort: 8080, Dest: util.AddressFromString("50.30.40.10"), @@ -91,7 +91,7 @@ func TestRegisterNat(t *testing.T) { rt.register(c) translation := rt.GetTranslationForConn( - network.ConnectionStats{ + &network.ConnectionStats{ Source: util.AddressFromString("10.0.0.0"), SPort: 12345, Dest: util.AddressFromString("50.30.40.10"), @@ -108,7 +108,7 @@ func TestRegisterNat(t *testing.T) { }, translation) udpTranslation := rt.GetTranslationForConn( - network.ConnectionStats{ + &network.ConnectionStats{ Source: util.AddressFromString("10.0.0.0"), SPort: 12345, Dest: util.AddressFromString("50.30.40.10"), @@ -126,7 +126,7 @@ func TestRegisterNatUDP(t *testing.T) { rt.register(c) translation := rt.GetTranslationForConn( - network.ConnectionStats{ + &network.ConnectionStats{ Source: util.AddressFromString("10.0.0.0"), SPort: 12345, Dest: util.AddressFromString("50.30.40.10"), @@ -143,7 +143,7 @@ func TestRegisterNatUDP(t *testing.T) { }, translation) translation = rt.GetTranslationForConn( - network.ConnectionStats{ + &network.ConnectionStats{ Source: util.AddressFromString("10.0.0.0"), SPort: 12345, Dest: util.AddressFromString("50.30.40.10"), @@ -158,7 +158,7 @@ func TestTooManyEntries(t *testing.T) { rt := newConntracker(2) rt.register(makeTranslatedConn(netip.MustParseAddr("10.0.0.0"), netip.MustParseAddr("20.0.0.0"), netip.MustParseAddr("50.30.40.10"), 6, 12345, 80, 80)) - tr := rt.GetTranslationForConn(network.ConnectionStats{ + tr := rt.GetTranslationForConn(&network.ConnectionStats{ Source: util.AddressFromString("10.0.0.0"), SPort: 12345, Dest: util.AddressFromString("50.30.40.10"), @@ -171,7 +171,7 @@ func TestTooManyEntries(t *testing.T) { rt.register(makeTranslatedConn(netip.MustParseAddr("10.0.0.1"), netip.MustParseAddr("20.0.0.1"), netip.MustParseAddr("50.30.40.20"), 6, 12345, 80, 80)) // old entry should be gone - tr = rt.GetTranslationForConn(network.ConnectionStats{ + tr = rt.GetTranslationForConn(&network.ConnectionStats{ Source: util.AddressFromString("10.0.0.0"), SPort: 12345, Dest: util.AddressFromString("50.30.40.10"), @@ -181,7 +181,7 @@ func TestTooManyEntries(t *testing.T) { require.Nil(t, tr) // check new entry - tr = rt.GetTranslationForConn(network.ConnectionStats{ + tr = rt.GetTranslationForConn(&network.ConnectionStats{ Source: util.AddressFromString("10.0.0.1"), SPort: 12345, Dest: util.AddressFromString("50.30.40.20"), diff --git a/pkg/network/netlink/noop.go b/pkg/network/netlink/noop.go index 37bf13c74628b..840043ff950c5 100644 --- a/pkg/network/netlink/noop.go +++ b/pkg/network/netlink/noop.go @@ -26,12 +26,12 @@ func NewNoOpConntracker() Conntracker { func (*noOpConntracker) GetType() string { return "" } //nolint:revive // TODO(NET) Fix revive linter -func (*noOpConntracker) GetTranslationForConn(c network.ConnectionStats) *network.IPTranslation { +func (*noOpConntracker) GetTranslationForConn(c *network.ConnectionStats) *network.IPTranslation { return nil } //nolint:revive // TODO(NET) Fix revive linter -func (*noOpConntracker) DeleteTranslation(c network.ConnectionStats) { +func (*noOpConntracker) DeleteTranslation(c *network.ConnectionStats) { } diff --git a/pkg/network/state.go b/pkg/network/state.go index 5b9a91c1bb99e..e8aea2b0a5f2b 100644 --- a/pkg/network/state.go +++ b/pkg/network/state.go @@ -114,8 +114,8 @@ type State interface { // RemoveConnections removes the given keys from the state RemoveConnections(conns []*ConnectionStats) - // StoreClosedConnections stores a batch of closed connections - StoreClosedConnections(connections []ConnectionStats) + // StoreClosedConnection stores a batch of closed connections + StoreClosedConnection(connection *ConnectionStats) // GetStats returns a map of statistics about the current network state GetStats() map[string]interface{} @@ -166,7 +166,7 @@ type closedConnections struct { // All empty connections are placed at the end. If it is not empty, it will be placed // at the index of the first empty connection, and the first empty connection will be placed at the end. // If there are no empty connections, it will be appended at the end. -func (cc *closedConnections) insert(c ConnectionStats, maxClosedConns uint32) { +func (cc *closedConnections) insert(c *ConnectionStats, maxClosedConns uint32) { // If we have reached the limit, drop an empty connection if uint32(len(cc.conns)) >= maxClosedConns { stateTelemetry.closedConnDropped.IncWithTags(c.Type.Tags()) @@ -174,8 +174,8 @@ func (cc *closedConnections) insert(c ConnectionStats, maxClosedConns uint32) { return } // If the connection is empty append at the end - if isEmpty(c) { - cc.conns = append(cc.conns, c) + if c.IsEmpty() { + cc.conns = append(cc.conns, *c) cc.byCookie[c.Cookie] = len(cc.conns) - 1 return } @@ -183,7 +183,7 @@ func (cc *closedConnections) insert(c ConnectionStats, maxClosedConns uint32) { // Insert the connection before empty connections if cc.emptyStart < len(cc.conns) { emptyConn := cc.conns[cc.emptyStart] - cc.conns[cc.emptyStart] = c + cc.conns[cc.emptyStart] = *c cc.conns = append(cc.conns, emptyConn) cc.byCookie[c.Cookie] = cc.emptyStart cc.byCookie[emptyConn.Cookie] = len(cc.conns) - 1 @@ -191,7 +191,7 @@ func (cc *closedConnections) insert(c ConnectionStats, maxClosedConns uint32) { return } // If there are no empty connections, append at the end - cc.conns = append(cc.conns, c) + cc.conns = append(cc.conns, *c) cc.byCookie[c.Cookie] = len(cc.conns) - 1 cc.emptyStart = len(cc.conns) } @@ -200,12 +200,12 @@ func (cc *closedConnections) insert(c ConnectionStats, maxClosedConns uint32) { // This method drops the incoming connection if it's empty or there are no empty connections in conns. // If neither of these conditions are true, it will drop the first empty connection and replace it with // the incoming connection. -func (cc *closedConnections) dropEmpty(c ConnectionStats) { - if isEmpty(c) || cc.emptyStart == len(cc.conns) { +func (cc *closedConnections) dropEmpty(c *ConnectionStats) { + if c.IsEmpty() || cc.emptyStart == len(cc.conns) { return } delete(cc.byCookie, cc.conns[cc.emptyStart].Cookie) - cc.conns[cc.emptyStart] = c + cc.conns[cc.emptyStart] = *c cc.byCookie[c.Cookie] = cc.emptyStart cc.emptyStart++ } @@ -216,25 +216,25 @@ func (cc *closedConnections) dropEmpty(c ConnectionStats) { // Otherwise it checks if the connection at i is empty and will be replaced with a non-empty conn. // If this is true, it will replace the connection and move it to where the first empty conn is. // If there isn't a change of state (both are empty or non-empty) it will simply replace the conn. -func (cc *closedConnections) replaceAt(i int, c ConnectionStats) { +func (cc *closedConnections) replaceAt(i int, c *ConnectionStats) { // pick the latest one if c.LastUpdateEpoch <= cc.conns[i].LastUpdateEpoch { return } // If c is empty and connn[i] is not, do not replace - if isEmpty(c) && i < cc.emptyStart { + if c.IsEmpty() && i < cc.emptyStart { return } // If conn[i] is empty and c is not, replace with the first empty connection - if !isEmpty(c) && i >= cc.emptyStart { + if !c.IsEmpty() && i >= cc.emptyStart { cc.conns[cc.emptyStart], cc.conns[i] = cc.conns[i], cc.conns[cc.emptyStart] cc.byCookie[cc.conns[i].Cookie] = i - cc.conns[cc.emptyStart] = c + cc.conns[cc.emptyStart] = *c cc.byCookie[c.Cookie] = cc.emptyStart cc.emptyStart++ return } - cc.conns[i] = c + cc.conns[i] = *c } type client struct { @@ -600,27 +600,25 @@ func (ns *networkState) mergeByCookie(conns []ConnectionStats) ([]ConnectionStat return conns, connsByKey } -// StoreClosedConnections wraps the unexported method while locking state -func (ns *networkState) StoreClosedConnections(closed []ConnectionStats) { +// StoreClosedConnection wraps the unexported method while locking state +func (ns *networkState) StoreClosedConnection(closed *ConnectionStats) { ns.Lock() defer ns.Unlock() - ns.storeClosedConnections(closed) + ns.storeClosedConnection(closed) } // storeClosedConnection stores the given connection for every client -func (ns *networkState) storeClosedConnections(conns []ConnectionStats) { +func (ns *networkState) storeClosedConnection(c *ConnectionStats) { for _, client := range ns.clients { - for _, c := range conns { - if i, ok := client.closed.byCookie[c.Cookie]; ok { - if ns.mergeConnectionStats(&client.closed.conns[i], &c) { - stateTelemetry.statsCookieCollisions.Inc() - client.closed.replaceAt(i, c) - } - continue + if i, ok := client.closed.byCookie[c.Cookie]; ok { + if ns.mergeConnectionStats(&client.closed.conns[i], c) { + stateTelemetry.statsCookieCollisions.Inc() + client.closed.replaceAt(i, c) } - client.closed.insert(c, ns.maxClosedConns) + continue } + client.closed.insert(c, ns.maxClosedConns) } } @@ -1496,9 +1494,3 @@ func (ns *networkState) mergeConnectionStats(a, b *ConnectionStats) (collision b return false } - -func isEmpty(conn ConnectionStats) bool { - return conn.Monotonic.RecvBytes == 0 && conn.Monotonic.RecvPackets == 0 && - conn.Monotonic.SentBytes == 0 && conn.Monotonic.SentPackets == 0 && - conn.Monotonic.Retransmits == 0 && len(conn.TCPFailures) == 0 -} diff --git a/pkg/network/state_test.go b/pkg/network/state_test.go index ad5a45d9cc33b..6cda66f67d158 100644 --- a/pkg/network/state_test.go +++ b/pkg/network/state_test.go @@ -95,7 +95,7 @@ func BenchmarkConnectionsGet(b *testing.B) { ns.GetDelta(DEBUGCLIENT, latestTime.Load(), nil, nil, nil) for _, c := range closed[:bench.closedCount] { - ns.StoreClosedConnections([]ConnectionStats{c}) + ns.StoreClosedConnection(&c) } b.ResetTimer() @@ -174,7 +174,7 @@ func TestRetrieveClosedConnection(t *testing.T) { t.Run("without prior registration", func(t *testing.T) { state := newDefaultState() - state.StoreClosedConnections([]ConnectionStats{conn}) + state.StoreClosedConnection(&conn) conns := state.GetDelta(clientID, latestEpochTime(), nil, nil, nil).Conns assert.Equal(t, 0, len(conns)) @@ -185,7 +185,7 @@ func TestRetrieveClosedConnection(t *testing.T) { state.RegisterClient(clientID) - state.StoreClosedConnections([]ConnectionStats{conn}) + state.StoreClosedConnection(&conn) conns := state.GetDelta(clientID, latestEpochTime(), nil, nil, nil).Conns assert.Equal(t, 1, len(conns)) @@ -275,9 +275,9 @@ func TestDropEmptyConnections(t *testing.T) { state.maxClosedConns = 1 state.RegisterClient(clientID) - state.storeClosedConnections([]ConnectionStats{{}}) + state.storeClosedConnection(&ConnectionStats{}) - state.storeClosedConnections([]ConnectionStats{conn}) + state.storeClosedConnection(&conn) conns := state.clients[clientID].closed.conns _, ok := state.clients[clientID].closed.byCookie[0] @@ -293,9 +293,9 @@ func TestDropEmptyConnections(t *testing.T) { state.maxClosedConns = 1 state.RegisterClient(clientID) - state.storeClosedConnections([]ConnectionStats{conn}) + state.storeClosedConnection(&conn) - state.storeClosedConnections([]ConnectionStats{{}}) + state.storeClosedConnection(&ConnectionStats{}) conns := state.clients[clientID].closed.conns _, ok := state.clients[clientID].closed.byCookie[0] @@ -311,11 +311,11 @@ func TestDropEmptyConnections(t *testing.T) { state.maxClosedConns = 1 state.RegisterClient(clientID) - state.storeClosedConnections([]ConnectionStats{conn}) + state.storeClosedConnection(&conn) conn2 := conn conn2.Cookie = 2 - state.storeClosedConnections([]ConnectionStats{conn2}) + state.storeClosedConnection(&conn2) conns := state.clients[clientID].closed.conns _, ok := state.clients[clientID].closed.byCookie[2] @@ -330,12 +330,12 @@ func TestDropEmptyConnections(t *testing.T) { state.maxClosedConns = 5 state.RegisterClient(clientID) - state.storeClosedConnections([]ConnectionStats{{}}) - state.storeClosedConnections([]ConnectionStats{conn}) + state.storeClosedConnection(&ConnectionStats{}) + state.storeClosedConnection(&conn) emptyconn := ConnectionStats{} emptyconn.Cookie = 2 - state.storeClosedConnections([]ConnectionStats{emptyconn}) + state.storeClosedConnection(&emptyconn) conns := state.clients[clientID].closed.conns @@ -346,7 +346,7 @@ func TestDropEmptyConnections(t *testing.T) { conn2 := conn conn2.Cookie = 2 conn2.LastUpdateEpoch = 100 - state.storeClosedConnections([]ConnectionStats{conn2}) + state.storeClosedConnection(&conn2) // Check that the index changed conns = state.clients[clientID].closed.conns @@ -358,13 +358,13 @@ func TestDropEmptyConnections(t *testing.T) { state.maxClosedConns = 5 state.RegisterClient(clientID) - state.storeClosedConnections([]ConnectionStats{{}}) - state.storeClosedConnections([]ConnectionStats{conn}) + state.storeClosedConnection(&ConnectionStats{}) + state.storeClosedConnection(&conn) // Send non-empty connection conn2 := conn conn2.Cookie = 2 - state.storeClosedConnections([]ConnectionStats{conn2}) + state.storeClosedConnection(&conn2) conns := state.clients[clientID].closed.conns @@ -375,7 +375,7 @@ func TestDropEmptyConnections(t *testing.T) { emptyconn := ConnectionStats{} emptyconn.Cookie = 2 emptyconn.LastUpdateEpoch = 100 - state.storeClosedConnections([]ConnectionStats{emptyconn}) + state.storeClosedConnection(&conn2) // Check that the index stayed the same conns = state.clients[clientID].closed.conns @@ -387,14 +387,14 @@ func TestDropEmptyConnections(t *testing.T) { state.maxClosedConns = 5 state.RegisterClient(clientID) - state.storeClosedConnections([]ConnectionStats{{}}) - state.storeClosedConnections([]ConnectionStats{conn}) + state.storeClosedConnection(&ConnectionStats{}) + state.storeClosedConnection(&conn) // Send non-empty connection conn2 := conn conn2.Cookie = 2 conn2.LastUpdateEpoch = 100 - state.storeClosedConnections([]ConnectionStats{conn2}) + state.storeClosedConnection(&conn2) conns := state.clients[clientID].closed.conns @@ -404,7 +404,7 @@ func TestDropEmptyConnections(t *testing.T) { // Send empty connection with same cookie emptyconn := ConnectionStats{} emptyconn.Cookie = 2 - state.storeClosedConnections([]ConnectionStats{emptyconn}) + state.storeClosedConnection(&conn2) // Check that the index stayed the same conns = state.clients[clientID].closed.conns @@ -416,13 +416,13 @@ func TestDropEmptyConnections(t *testing.T) { state.maxClosedConns = 5 state.RegisterClient(clientID) - state.storeClosedConnections([]ConnectionStats{{}}) - state.storeClosedConnections([]ConnectionStats{conn}) + state.storeClosedConnection(&ConnectionStats{}) + state.storeClosedConnection(&conn) // Send non-empty connection conn2 := conn conn2.Cookie = 2 - state.storeClosedConnections([]ConnectionStats{conn2}) + state.storeClosedConnection(&conn2) conns := state.clients[clientID].closed.conns @@ -439,7 +439,7 @@ func TestDropEmptyConnections(t *testing.T) { RecvBytes: 3333, Retransmits: 4, } - state.storeClosedConnections([]ConnectionStats{conn3}) + state.storeClosedConnection(&conn3) // Check that the index stayed the same conns = state.clients[clientID].closed.conns @@ -451,8 +451,8 @@ func TestDropEmptyConnections(t *testing.T) { state.maxClosedConns = 5 state.RegisterClient(clientID) - state.storeClosedConnections([]ConnectionStats{conn}) - state.storeClosedConnections([]ConnectionStats{{}}) + state.storeClosedConnection(&conn) + state.storeClosedConnection(&ConnectionStats{}) conns := state.clients[clientID].closed.conns emptyConnStart := state.clients[clientID].closed.emptyStart @@ -466,8 +466,8 @@ func TestDropEmptyConnections(t *testing.T) { state.maxClosedConns = 5 state.RegisterClient(clientID) - state.storeClosedConnections([]ConnectionStats{{}}) - state.storeClosedConnections([]ConnectionStats{conn}) + state.storeClosedConnection(&ConnectionStats{}) + state.storeClosedConnection(&conn) conns := state.clients[clientID].closed.conns emptyConnStart := state.clients[clientID].closed.emptyStart @@ -481,11 +481,11 @@ func TestDropEmptyConnections(t *testing.T) { state.maxClosedConns = 5 state.RegisterClient(clientID) - state.storeClosedConnections([]ConnectionStats{conn}) + state.storeClosedConnection(&conn) conn2 := conn conn2.Cookie = 2 - state.storeClosedConnections([]ConnectionStats{conn2}) + state.storeClosedConnection(&conn2) conns := state.clients[clientID].closed.conns emptyConnStart := state.clients[clientID].closed.emptyStart @@ -742,7 +742,7 @@ func TestLastStatsForClosedConnection(t *testing.T) { assert.Equal(t, conn.Monotonic.RecvBytes, conns[0].Monotonic.RecvBytes) assert.Equal(t, conn.Monotonic.Retransmits, conns[0].Monotonic.Retransmits) - state.StoreClosedConnections([]ConnectionStats{conn2}) + state.StoreClosedConnection(&conn2) // We should have one connection with last stats conns = state.GetDelta(clientID, latestEpochTime(), nil, nil, nil).Conns @@ -845,7 +845,7 @@ func TestSameKeyEdgeCases(t *testing.T) { assert.Equal(t, 0, len(conns)) // Store the connection as closed - state.StoreClosedConnections([]ConnectionStats{conn}) + state.StoreClosedConnection(&conn) // Second get, we should have monotonic and last stats = 3 conns = state.GetDelta(client, latestEpochTime(), nil, nil, nil).Conns @@ -880,14 +880,14 @@ func TestSameKeyEdgeCases(t *testing.T) { assert.Equal(t, 0, len(conns)) // Store the connection as closed - state.StoreClosedConnections([]ConnectionStats{conn}) + state.StoreClosedConnection(&conn) conn2 := conn conn2.Cookie = 2 conn2.Monotonic = StatCounters{SentBytes: 5} conn2.LastUpdateEpoch++ // Store the connection another time - state.StoreClosedConnections([]ConnectionStats{conn2}) + state.StoreClosedConnection(&conn2) // Second get, we should have monotonic and last stats = 8 conns = state.GetDelta(client, latestEpochTime(), nil, nil, nil).Conns @@ -947,7 +947,7 @@ func TestSameKeyEdgeCases(t *testing.T) { // Store the connection as closed conn.Monotonic.SentBytes++ conn.LastUpdateEpoch = latestEpochTime() - state.StoreClosedConnections([]ConnectionStats{conn}) + state.StoreClosedConnection(&conn) conn2 := conn conn2.Monotonic.SentBytes = 1 @@ -965,7 +965,7 @@ func TestSameKeyEdgeCases(t *testing.T) { conn2.Monotonic.SentBytes++ conn.LastUpdateEpoch = latestEpochTime() // Store the connection as closed - state.StoreClosedConnections([]ConnectionStats{conn2}) + state.StoreClosedConnection(&conn2) conns = state.GetDelta(client, latestEpochTime(), nil, nil, nil).Conns require.Len(t, conns, 1) @@ -1000,7 +1000,7 @@ func TestSameKeyEdgeCases(t *testing.T) { assert.Equal(t, 0, len(conns)) // Store the connection as closed - state.StoreClosedConnections([]ConnectionStats{conn}) + state.StoreClosedConnection(&conn) conn2 := conn conn2.Cookie = 2 @@ -1021,7 +1021,7 @@ func TestSameKeyEdgeCases(t *testing.T) { // Store the connection as closed conn2.Monotonic.SentBytes += 3 conn2.LastUpdateEpoch++ - state.StoreClosedConnections([]ConnectionStats{conn2}) + state.StoreClosedConnection(&conn2) // Store the connection again conn3 := conn2 @@ -1041,7 +1041,7 @@ func TestSameKeyEdgeCases(t *testing.T) { // Store the connection as closed conn3.Monotonic.SentBytes += 2 - state.StoreClosedConnections([]ConnectionStats{conn3}) + state.StoreClosedConnection(&conn3) // 4th get, we should have monotonic = 3 and last stats = 2 conns = state.GetDelta(client, latestEpochTime(), nil, nil, nil).Conns @@ -1088,7 +1088,7 @@ func TestSameKeyEdgeCases(t *testing.T) { // Store the connection as closed conn2 := conn conn2.Monotonic = StatCounters{SentBytes: 8} - state.StoreClosedConnections([]ConnectionStats{conn2}) + state.StoreClosedConnection(&conn2) // Second get, we should have monotonic = 8 and last stats = 5 conns = state.GetDelta(client, latestEpochTime(), nil, nil, nil).Conns @@ -1143,7 +1143,7 @@ func TestSameKeyEdgeCases(t *testing.T) { assert.Equal(t, 0, len(conns)) // Store the connection as closed - state.StoreClosedConnections([]ConnectionStats{conn}) + state.StoreClosedConnection(&conn) // Second get for client d we should have monotonic and last stats = 3 conns = state.GetDelta(clientD, latestEpochTime(), nil, nil, nil).Conns @@ -1183,7 +1183,7 @@ func TestSameKeyEdgeCases(t *testing.T) { // Store the connection as closed conn2.Monotonic.SentBytes += 2 conn2.LastUpdateEpoch++ - state.StoreClosedConnections([]ConnectionStats{conn2}) + state.StoreClosedConnection(&conn2) // Store the connection again conn3 := conn2 @@ -1216,7 +1216,7 @@ func TestSameKeyEdgeCases(t *testing.T) { // Store the connection as closed conn3.Monotonic.SentBytes++ conn3.LastUpdateEpoch++ - state.StoreClosedConnections([]ConnectionStats{conn3}) + state.StoreClosedConnection(&conn3) // 4th get, for client c we should have monotonic = 3 and last stats = 2 conns = state.GetDelta(client, latestEpochTime(), nil, nil, nil).Conns @@ -1307,7 +1307,7 @@ func TestSameKeyEdgeCases(t *testing.T) { // Store the connection as closed conn.Monotonic.SentBytes++ conn.LastUpdateEpoch++ - state.StoreClosedConnections([]ConnectionStats{conn}) + state.StoreClosedConnection(&conn) // Second get for client d we should have monotonic and last stats = 3 conns = state.GetDelta(clientD, latestEpochTime(), nil, nil, nil).Conns @@ -1354,7 +1354,7 @@ func TestSameKeyEdgeCases(t *testing.T) { // Store the connection as closed conn2.Monotonic.SentBytes += 2 conn2.LastUpdateEpoch++ - state.StoreClosedConnections([]ConnectionStats{conn2}) + state.StoreClosedConnection(&conn2) // 4th get, for client e we should have monotonic = 5 and last stats = 5 conns = state.GetDelta(clientE, latestEpochTime(), nil, nil, nil).Conns @@ -1498,9 +1498,9 @@ func TestDoubleCloseOnTwoClients(t *testing.T) { state.RegisterClient(client2) // Store the closed connection twice - state.StoreClosedConnections([]ConnectionStats{conn}) + state.StoreClosedConnection(&conn) conn.LastUpdateEpoch++ - state.StoreClosedConnections([]ConnectionStats{conn}) + state.StoreClosedConnection(&conn) // Get the connections for client1 we should have only one with stats counted only once conns := state.GetDelta(client1, latestEpochTime(), nil, nil, nil).Conns @@ -1535,7 +1535,7 @@ func TestUnorderedCloseEvent(t *testing.T) { conn.LastUpdateEpoch = latestEpochTime() + 1 conn.Monotonic.SentBytes++ conn.Monotonic.RecvBytes = 1 - state.StoreClosedConnections([]ConnectionStats{conn}) + state.StoreClosedConnection(&conn) conn.LastUpdateEpoch-- conn.Monotonic.SentBytes-- @@ -1553,7 +1553,7 @@ func TestUnorderedCloseEvent(t *testing.T) { // Simulate having the connection getting active again conn.LastUpdateEpoch = latestEpochTime() conn.Monotonic.SentBytes-- - state.StoreClosedConnections([]ConnectionStats{conn}) + state.StoreClosedConnection(&conn) conns = state.GetDelta(client, latestEpochTime(), nil, nil, nil).Conns require.Len(t, conns, 1) @@ -1583,13 +1583,13 @@ func TestAggregateClosedConnectionsTimestamp(t *testing.T) { state.RegisterClient(client) conn.LastUpdateEpoch = latestEpochTime() - state.StoreClosedConnections([]ConnectionStats{conn}) + state.StoreClosedConnection(&conn) conn.LastUpdateEpoch = latestEpochTime() - state.StoreClosedConnections([]ConnectionStats{conn}) + state.StoreClosedConnection(&conn) conn.LastUpdateEpoch = latestEpochTime() - state.StoreClosedConnections([]ConnectionStats{conn}) + state.StoreClosedConnection(&conn) // Make sure the connections we get has the latest timestamp delta := state.GetDelta(client, latestEpochTime(), nil, nil, nil) @@ -1793,7 +1793,7 @@ func testHTTPStatsWithMultipleClients(t *testing.T, aggregateByStatusCode bool) // Store the connection to both clients & pass HTTP stats to the first client c.LastUpdateEpoch = latestEpochTime() - state.StoreClosedConnections([]ConnectionStats{c}) + state.StoreClosedConnection(&c) delta := state.GetDelta(client1, latestEpochTime(), nil, nil, getStats("/testpath")) assert.Len(t, delta.HTTP, 1) @@ -1807,7 +1807,7 @@ func testHTTPStatsWithMultipleClients(t *testing.T, aggregateByStatusCode bool) assert.Len(t, delta.HTTP, 0) c.LastUpdateEpoch = latestEpochTime() - state.StoreClosedConnections([]ConnectionStats{c}) + state.StoreClosedConnection(&c) // Pass in new HTTP stats to the first client delta = state.GetDelta(client1, latestEpochTime(), nil, nil, getStats("/testpath2")) @@ -1865,7 +1865,7 @@ func testHTTP2StatsWithMultipleClients(t *testing.T, aggregateByStatusCode bool) // Store the connection to both clients & pass HTTP2 stats to the first client c.LastUpdateEpoch = latestEpochTime() - state.StoreClosedConnections([]ConnectionStats{c}) + state.StoreClosedConnection(&c) delta := state.GetDelta(client1, latestEpochTime(), nil, nil, getStats("/testpath")) assert.Len(t, delta.HTTP2, 1) @@ -1879,7 +1879,7 @@ func testHTTP2StatsWithMultipleClients(t *testing.T, aggregateByStatusCode bool) assert.Len(t, delta.HTTP2, 0) c.LastUpdateEpoch = latestEpochTime() - state.StoreClosedConnections([]ConnectionStats{c}) + state.StoreClosedConnection(&c) // Pass in new HTTP2 stats to the first client delta = state.GetDelta(client1, latestEpochTime(), nil, nil, getStats("/testpath2")) @@ -2224,8 +2224,8 @@ func TestClosedMergingWithAddressCollision(t *testing.T) { state := newDefaultState() state.RegisterClient(client) - state.StoreClosedConnections([]ConnectionStats{c1}) - state.StoreClosedConnections([]ConnectionStats{c2}) + state.StoreClosedConnection(&c1) + state.StoreClosedConnection(&c2) active := ConnectionStats{ Pid: 123, @@ -2288,7 +2288,7 @@ func TestClosedMergingWithAddressCollision(t *testing.T) { // time a connection is seen _ = state.GetDelta(client, latestEpochTime(), []ConnectionStats{c1}, nil, nil) c2.Cookie = c1.Cookie - state.StoreClosedConnections([]ConnectionStats{c2}) + state.StoreClosedConnection(&c2) // assert that the value returned by the second call to `GetDelta` represents c2 - c1 delta := state.GetDelta(client, latestEpochTime(), nil, nil, nil) @@ -2367,7 +2367,7 @@ func TestKafkaStatsWithMultipleClients(t *testing.T) { // Store the connection to both clients & pass HTTP stats to the first client c.LastUpdateEpoch = latestEpochTime() - state.StoreClosedConnections([]ConnectionStats{c}) + state.StoreClosedConnection(&c) delta := state.GetDelta(client1, latestEpochTime(), nil, nil, getStats("my-topic")) assert.Len(t, delta.Kafka, 1) @@ -2381,7 +2381,7 @@ func TestKafkaStatsWithMultipleClients(t *testing.T) { assert.Len(t, delta.Kafka, 0) c.LastUpdateEpoch = latestEpochTime() - state.StoreClosedConnections([]ConnectionStats{c}) + state.StoreClosedConnection(&c) // Pass in new Kafka stats to the first client delta = state.GetDelta(client1, latestEpochTime(), nil, nil, getStats("my-topic")) diff --git a/pkg/network/tracer/connection/tcp_close_consumer.go b/pkg/network/tracer/connection/tcp_close_consumer.go index 8cb2e8dfaf587..3bb67443ee54c 100644 --- a/pkg/network/tracer/connection/tcp_close_consumer.go +++ b/pkg/network/tracer/connection/tcp_close_consumer.go @@ -88,7 +88,7 @@ func (c *tcpCloseConsumer) extractConn(data []byte) { updateTCPStats(conn, &ct.Tcp_stats, ct.Tcp_retransmits) } -func (c *tcpCloseConsumer) Start(callback func([]network.ConnectionStats)) { +func (c *tcpCloseConsumer) Start(callback func(*network.ConnectionStats)) { if c == nil { return } @@ -135,7 +135,10 @@ func (c *tcpCloseConsumer) Start(callback func([]network.ConnectionStats)) { closeConsumerTelemetry.perfReceived.Add(float64(c.buffer.Len())) closedCount += uint64(c.buffer.Len()) - callback(c.buffer.Connections()) + conns := c.buffer.Connections() + for i := range conns { + callback(&conns[i]) + } c.buffer.Reset() batchData.Done() // lost events only occur when using perf buffers @@ -148,7 +151,10 @@ func (c *tcpCloseConsumer) Start(callback func([]network.ConnectionStats)) { case request := <-c.requests: oneTimeBuffer := network.NewConnectionBuffer(32, 32) c.batchManager.GetPendingConns(oneTimeBuffer) - callback(oneTimeBuffer.Connections()) + conns := oneTimeBuffer.Connections() + for i := range conns { + callback(&conns[i]) + } close(request) closedCount += uint64(oneTimeBuffer.Len()) diff --git a/pkg/network/tracer/connection/tracer.go b/pkg/network/tracer/connection/tracer.go index e308327e38de5..3e9588aacadd4 100644 --- a/pkg/network/tracer/connection/tracer.go +++ b/pkg/network/tracer/connection/tracer.go @@ -66,7 +66,7 @@ const ( // Tracer is the common interface implemented by all connection tracers. type Tracer interface { // Start begins collecting network connection data. - Start(func([]network.ConnectionStats)) error + Start(func(*network.ConnectionStats)) error // Stop halts all network data collection. Stop() // GetConnections returns the list of currently active connections, using the buffer provided. @@ -338,7 +338,7 @@ func boolConst(name string, value bool) manager.ConstantEditor { return c } -func (t *tracer) Start(callback func([]network.ConnectionStats)) (err error) { +func (t *tracer) Start(callback func(*network.ConnectionStats)) (err error) { defer func() { if err != nil { t.Stop() diff --git a/pkg/network/tracer/conntracker_test.go b/pkg/network/tracer/conntracker_test.go index 0a87d75a4f342..3fbc698c70b10 100644 --- a/pkg/network/tracer/conntracker_test.go +++ b/pkg/network/tracer/conntracker_test.go @@ -157,7 +157,7 @@ func testConntracker(t *testing.T, serverIP, clientIP net.IP, ct netlink.Conntra NetNS: curNs, } require.Eventually(t, func() bool { - trans = ct.GetTranslationForConn(cs) + trans = ct.GetTranslationForConn(&cs) return trans != nil }, 5*time.Second, 100*time.Millisecond, "timed out waiting for TCP NAT conntrack entry for %s", cs.String()) assert.Equal(t, util.AddressFromNetIP(serverIP), trans.ReplSrcIP) @@ -173,7 +173,7 @@ func testConntracker(t *testing.T, serverIP, clientIP net.IP, ct netlink.Conntra Type: network.TCP, NetNS: curNs, } - trans = ct.GetTranslationForConn(cs) + trans = ct.GetTranslationForConn(&cs) assert.Nil(t, trans) }) @@ -199,7 +199,7 @@ func testConntracker(t *testing.T, serverIP, clientIP net.IP, ct netlink.Conntra NetNS: curNs, } require.Eventually(t, func() bool { - trans = ct.GetTranslationForConn(cs) + trans = ct.GetTranslationForConn(&cs) return trans != nil }, 5*time.Second, 100*time.Millisecond, "timed out waiting for UDP NAT conntrack entry for %s", cs.String()) assert.Equal(t, util.AddressFromNetIP(serverIP), trans.ReplSrcIP) @@ -230,7 +230,7 @@ func testConntrackerCrossNamespace(t *testing.T, ct netlink.Conntracker) { NetNS: testIno, } require.Eventually(t, func() bool { - trans = ct.GetTranslationForConn(cs) + trans = ct.GetTranslationForConn(&cs) return trans != nil }, 5*time.Second, 100*time.Millisecond, "timed out waiting for conntrack entry for %s", cs.String()) @@ -286,7 +286,7 @@ func testConntrackerCrossNamespaceNATonRoot(t *testing.T, ct netlink.Conntracker NetNS: testIno, } require.Eventually(t, func() bool { - trans = ct.GetTranslationForConn(cs) + trans = ct.GetTranslationForConn(&cs) return trans != nil }, 5*time.Second, 100*time.Millisecond, "timed out waiting for conntrack entry for %s", cs.String()) diff --git a/pkg/network/tracer/ebpf_conntracker.go b/pkg/network/tracer/ebpf_conntracker.go index 5a2f2a7a6ed2b..0f71b2f949c88 100644 --- a/pkg/network/tracer/ebpf_conntracker.go +++ b/pkg/network/tracer/ebpf_conntracker.go @@ -228,12 +228,12 @@ func (e *ebpfConntracker) GetType() string { return "ebpf" } -func (e *ebpfConntracker) GetTranslationForConn(stats network.ConnectionStats) *network.IPTranslation { +func (e *ebpfConntracker) GetTranslationForConn(stats *network.ConnectionStats) *network.IPTranslation { start := time.Now() src := tuplePool.Get() defer tuplePool.Put(src) - toConntrackTupleFromStats(src, &stats) + toConntrackTupleFromStats(src, stats) if log.ShouldLog(seelog.TraceLvl) { log.Tracef("looking up in conntrack (stats): %s", stats) } @@ -301,11 +301,11 @@ func (e *ebpfConntracker) delete(key *netebpf.ConntrackTuple) { } } -func (e *ebpfConntracker) DeleteTranslation(stats network.ConnectionStats) { +func (e *ebpfConntracker) DeleteTranslation(stats *network.ConnectionStats) { key := tuplePool.Get() defer tuplePool.Put(key) - toConntrackTupleFromStats(key, &stats) + toConntrackTupleFromStats(key, stats) dst := e.get(key) e.delete(key) diff --git a/pkg/network/tracer/testutil/conntrack.go b/pkg/network/tracer/testutil/conntrack.go index 6c0d2ed14f772..23b6938006208 100644 --- a/pkg/network/tracer/testutil/conntrack.go +++ b/pkg/network/tracer/testutil/conntrack.go @@ -32,7 +32,7 @@ func NewDelayedConntracker(ctr netlink.Conntracker, numDelays int) netlink.Connt } } -func (ctr *delayedConntracker) GetTranslationForConn(c network.ConnectionStats) *network.IPTranslation { +func (ctr *delayedConntracker) GetTranslationForConn(c *network.ConnectionStats) *network.IPTranslation { ctr.mux.Lock() defer ctr.mux.Unlock() diff --git a/pkg/network/tracer/tracer.go b/pkg/network/tracer/tracer.go index 648119a0eb87a..db5f0bc95a1a9 100644 --- a/pkg/network/tracer/tracer.go +++ b/pkg/network/tracer/tracer.go @@ -222,7 +222,7 @@ func newTracer(cfg *config.Config, telemetryComponent telemetryComponent.Compone // start starts the tracer. This function is present to separate // the creation from the start of the tracer for tests func (t *Tracer) start() error { - err := t.ebpfTracer.Start(t.storeClosedConnections) + err := t.ebpfTracer.Start(t.storeClosedConnection) if err != nil { t.Stop() return fmt.Errorf("could not start ebpf tracer: %s", err) @@ -290,37 +290,30 @@ func newReverseDNS(c *config.Config, telemetrycomp telemetryComponent.Component) return rdns } -// storeClosedConnections is triggered when: +// storeClosedConnection is triggered when: // * the current closed connection batch fills up // * the client asks for the current connections // this function is responsible for storing the closed connections in the state and // matching failed connections to closed connections -func (t *Tracer) storeClosedConnections(connections []network.ConnectionStats) { - var rejected int - for i := range connections { - cs := &connections[i] - cs.IsClosed = true - if t.shouldSkipConnection(cs) { - connections[rejected], connections[i] = connections[i], connections[rejected] - rejected++ - tracerTelemetry.skippedConns.IncWithTags(cs.Type.Tags()) - continue - } +func (t *Tracer) storeClosedConnection(cs *network.ConnectionStats) { + cs.IsClosed = true + if t.shouldSkipConnection(cs) { + tracerTelemetry.skippedConns.IncWithTags(cs.Type.Tags()) + return + } - cs.IPTranslation = t.conntracker.GetTranslationForConn(*cs) - t.connVia(cs) - if cs.IPTranslation != nil { - t.conntracker.DeleteTranslation(*cs) - } + cs.IPTranslation = t.conntracker.GetTranslationForConn(cs) + t.connVia(cs) + if cs.IPTranslation != nil { + t.conntracker.DeleteTranslation(cs) + } - t.addProcessInfo(cs) + t.addProcessInfo(cs) - tracerTelemetry.closedConns.IncWithTags(cs.Type.Tags()) - t.ebpfTracer.GetFailedConnections().MatchFailedConn(cs) - } + tracerTelemetry.closedConns.IncWithTags(cs.Type.Tags()) + t.ebpfTracer.GetFailedConnections().MatchFailedConn(cs) - connections = connections[rejected:] - t.state.StoreClosedConnections(connections) + t.state.StoreClosedConnection(cs) } func (t *Tracer) addProcessInfo(c *network.ConnectionStats) { @@ -542,7 +535,7 @@ func (t *Tracer) getConnections(activeBuffer *network.ConnectionBuffer) (latestU activeConnections = activeBuffer.Connections() for i := range activeConnections { - activeConnections[i].IPTranslation = t.conntracker.GetTranslationForConn(activeConnections[i]) + activeConnections[i].IPTranslation = t.conntracker.GetTranslationForConn(&activeConnections[i]) // do gateway resolution only on active connections outside // the map iteration loop to not add to connections while // iterating (leads to ever-increasing connections in the map, @@ -595,7 +588,7 @@ func (t *Tracer) removeEntries(entries []network.ConnectionStats) { } // Delete conntrack entry for this connection - t.conntracker.DeleteTranslation(*entry) + t.conntracker.DeleteTranslation(entry) // Append the connection key to the keys to remove from the userspace state toRemove = append(toRemove, entry) diff --git a/pkg/network/tracer/tracer_linux_test.go b/pkg/network/tracer/tracer_linux_test.go index f526dab90a00a..c8d9674a40a48 100644 --- a/pkg/network/tracer/tracer_linux_test.go +++ b/pkg/network/tracer/tracer_linux_test.go @@ -434,7 +434,7 @@ func (s *TracerSuite) TestConntrackExpiration() { if !assert.True(collect, ok, "connection not found") { return } - assert.NotNil(collect, tr.conntracker.GetTranslationForConn(*conn), "connection does not have NAT translation") + assert.NotNil(collect, tr.conntracker.GetTranslationForConn(conn), "connection does not have NAT translation") }, 3*time.Second, 100*time.Millisecond, "failed to find connection translation") // This will force the connection to be expired next time we call getConnections, but @@ -443,7 +443,7 @@ func (s *TracerSuite) TestConntrackExpiration() { tr.config.TCPConnTimeout = time.Duration(-1) _ = getConnections(t, tr) - assert.NotNil(t, tr.conntracker.GetTranslationForConn(*conn), "translation should not have been deleted") + assert.NotNil(t, tr.conntracker.GetTranslationForConn(conn), "translation should not have been deleted") // delete the connection from system conntrack cmd := exec.Command("conntrack", "-D", "-s", c.LocalAddr().(*net.TCPAddr).IP.String(), "-d", c.RemoteAddr().(*net.TCPAddr).IP.String(), "-p", "tcp") @@ -451,7 +451,7 @@ func (s *TracerSuite) TestConntrackExpiration() { require.NoError(t, err, "conntrack delete failed, output: %s", out) _ = getConnections(t, tr) - assert.Nil(t, tr.conntracker.GetTranslationForConn(*conn), "translation should have been deleted") + assert.Nil(t, tr.conntracker.GetTranslationForConn(conn), "translation should have been deleted") // write newline so server connections will exit _, err = c.Write([]byte("\n")) @@ -493,7 +493,7 @@ func (s *TracerSuite) TestConntrackDelays() { require.Eventually(t, func() bool { connections := getConnections(t, tr) conn, ok := findConnection(c.LocalAddr(), c.RemoteAddr(), connections) - return ok && tr.conntracker.GetTranslationForConn(*conn) != nil + return ok && tr.conntracker.GetTranslationForConn(conn) != nil }, 3*time.Second, 100*time.Millisecond, "failed to find connection with translation") // write newline so server connections will exit @@ -542,7 +542,7 @@ func (s *TracerSuite) TestTranslationBindingRegression() { Type: network.TCP, } require.Eventually(t, func() bool { - return tr.conntracker.GetTranslationForConn(cs) != nil + return tr.conntracker.GetTranslationForConn(&cs) != nil }, 3*time.Second, 100*time.Millisecond, "timed out waiting for conntrack update") // Assert that the connection to 2.2.2.2 has an IPTranslation object bound to it diff --git a/pkg/network/tracer/tracer_windows.go b/pkg/network/tracer/tracer_windows.go index 3933e6a619f6a..4e39b66739f8e 100644 --- a/pkg/network/tracer/tracer_windows.go +++ b/pkg/network/tracer/tracer_windows.go @@ -136,7 +136,9 @@ func NewTracer(config *config.Config, telemetry telemetry.Component) (*Tracer, e }) closedConnStats := tr.closedBuffer.Connections() - tr.state.StoreClosedConnections(closedConnStats) + for i := range closedConnStats { + tr.state.StoreClosedConnection(&closedConnStats[i]) + } case windows.WAIT_FAILED: break waitloop @@ -198,7 +200,9 @@ func (t *Tracer) GetActiveConnections(clientID string) (*network.Connections, er // check for expired clients in the state t.state.RemoveExpiredClients(time.Now()) - t.state.StoreClosedConnections(closedConnStats) + for i := range closedConnStats { + t.state.StoreClosedConnection(&closedConnStats[i]) + } var delta network.Delta if t.usmMonitor != nil { //nolint