Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do not copy ConnectionStats until we have to #28173

Merged
merged 1 commit into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/network/event_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,17 @@ func (c ConnectionStats) IsExpired(now uint64, timeout uint64) bool {
return c.LastUpdateEpoch+timeout <= now
}

// IsEmpty returns whether the connection has any statistics
func (c ConnectionStats) IsEmpty() bool {
// TODO why does this not include TCPEstablished and TCPClosed?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no objection with adding those fields here, do you @hmahmood?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

happy to do that in a different PR though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually we may set the closed flags on otherwise totally empty connections in kernelspace... so this could end up returning false a lot more

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function was mostly checking for whether the connection exchanged any data, and almost exclusively for code that is storing closed connections in state.go. As opposed to something like IsZero which does check for those other two fields.

return c.Monotonic.RecvBytes == 0 &&
c.Monotonic.RecvPackets == 0 &&
c.Monotonic.SentBytes == 0 &&
c.Monotonic.SentPackets == 0 &&
c.Monotonic.Retransmits == 0 &&
len(c.TCPFailures) == 0
}

// ByteKey returns a unique key for this connection represented as a byte slice
// It's as following:
//
Expand Down
8 changes: 4 additions & 4 deletions pkg/network/netlink/conntracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ type Conntracker interface {
Describe(descs chan<- *prometheus.Desc)
// Collect returns the current state of all metrics of the collector
Collect(metrics chan<- prometheus.Metric)
GetTranslationForConn(network.ConnectionStats) *network.IPTranslation
GetTranslationForConn(*network.ConnectionStats) *network.IPTranslation
// GetType returns a string describing whether the conntracker is "ebpf" or "netlink"
GetType() string
DeleteTranslation(network.ConnectionStats)
DeleteTranslation(*network.ConnectionStats)
DumpCachedTable(context.Context) (map[uint32][]DebugConntrackEntry, error)
Close()
}
Expand Down Expand Up @@ -166,7 +166,7 @@ func (ctr *realConntracker) GetType() string {
return "netlink"
}

func (ctr *realConntracker) GetTranslationForConn(c network.ConnectionStats) *network.IPTranslation {
func (ctr *realConntracker) GetTranslationForConn(c *network.ConnectionStats) *network.IPTranslation {
then := time.Now()
defer func() {
conntrackerTelemetry.getsDuration.Observe(float64(time.Since(then).Nanoseconds()))
Expand Down Expand Up @@ -202,7 +202,7 @@ func (ctr *realConntracker) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(conntrackerTelemetry.orphanSize, prometheus.CounterValue, float64(ctr.cache.orphans.Len()))
}

func (ctr *realConntracker) DeleteTranslation(c network.ConnectionStats) {
func (ctr *realConntracker) DeleteTranslation(c *network.ConnectionStats) {
then := time.Now()

ctr.Lock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/netlink/conntracker_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestConnTrackerCrossNamespaceAllNsDisabled(t *testing.T) {

time.Sleep(time.Second)
trans := ct.GetTranslationForConn(
network.ConnectionStats{
&network.ConnectionStats{
Source: util.AddressFromNetIP(laddr.IP),
SPort: uint16(laddr.Port),
Dest: util.AddressFromString("2.2.2.4"),
Expand Down
16 changes: 8 additions & 8 deletions pkg/network/netlink/conntracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestRegisterNonNat(t *testing.T) {

rt.register(c)
translation := rt.GetTranslationForConn(
network.ConnectionStats{
&network.ConnectionStats{
Source: util.AddressFromString("10.0.0.0"),
SPort: 8080,
Dest: util.AddressFromString("50.30.40.10"),
Expand All @@ -91,7 +91,7 @@ func TestRegisterNat(t *testing.T) {

rt.register(c)
translation := rt.GetTranslationForConn(
network.ConnectionStats{
&network.ConnectionStats{
Source: util.AddressFromString("10.0.0.0"),
SPort: 12345,
Dest: util.AddressFromString("50.30.40.10"),
Expand All @@ -108,7 +108,7 @@ func TestRegisterNat(t *testing.T) {
}, translation)

udpTranslation := rt.GetTranslationForConn(
network.ConnectionStats{
&network.ConnectionStats{
Source: util.AddressFromString("10.0.0.0"),
SPort: 12345,
Dest: util.AddressFromString("50.30.40.10"),
Expand All @@ -126,7 +126,7 @@ func TestRegisterNatUDP(t *testing.T) {

rt.register(c)
translation := rt.GetTranslationForConn(
network.ConnectionStats{
&network.ConnectionStats{
Source: util.AddressFromString("10.0.0.0"),
SPort: 12345,
Dest: util.AddressFromString("50.30.40.10"),
Expand All @@ -143,7 +143,7 @@ func TestRegisterNatUDP(t *testing.T) {
}, translation)

translation = rt.GetTranslationForConn(
network.ConnectionStats{
&network.ConnectionStats{
Source: util.AddressFromString("10.0.0.0"),
SPort: 12345,
Dest: util.AddressFromString("50.30.40.10"),
Expand All @@ -158,7 +158,7 @@ func TestTooManyEntries(t *testing.T) {
rt := newConntracker(2)

rt.register(makeTranslatedConn(netip.MustParseAddr("10.0.0.0"), netip.MustParseAddr("20.0.0.0"), netip.MustParseAddr("50.30.40.10"), 6, 12345, 80, 80))
tr := rt.GetTranslationForConn(network.ConnectionStats{
tr := rt.GetTranslationForConn(&network.ConnectionStats{
Source: util.AddressFromString("10.0.0.0"),
SPort: 12345,
Dest: util.AddressFromString("50.30.40.10"),
Expand All @@ -171,7 +171,7 @@ func TestTooManyEntries(t *testing.T) {

rt.register(makeTranslatedConn(netip.MustParseAddr("10.0.0.1"), netip.MustParseAddr("20.0.0.1"), netip.MustParseAddr("50.30.40.20"), 6, 12345, 80, 80))
// old entry should be gone
tr = rt.GetTranslationForConn(network.ConnectionStats{
tr = rt.GetTranslationForConn(&network.ConnectionStats{
Source: util.AddressFromString("10.0.0.0"),
SPort: 12345,
Dest: util.AddressFromString("50.30.40.10"),
Expand All @@ -181,7 +181,7 @@ func TestTooManyEntries(t *testing.T) {
require.Nil(t, tr)

// check new entry
tr = rt.GetTranslationForConn(network.ConnectionStats{
tr = rt.GetTranslationForConn(&network.ConnectionStats{
Source: util.AddressFromString("10.0.0.1"),
SPort: 12345,
Dest: util.AddressFromString("50.30.40.20"),
Expand Down
4 changes: 2 additions & 2 deletions pkg/network/netlink/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ func NewNoOpConntracker() Conntracker {
func (*noOpConntracker) GetType() string { return "" }

//nolint:revive // TODO(NET) Fix revive linter
func (*noOpConntracker) GetTranslationForConn(c network.ConnectionStats) *network.IPTranslation {
func (*noOpConntracker) GetTranslationForConn(c *network.ConnectionStats) *network.IPTranslation {
return nil
}

//nolint:revive // TODO(NET) Fix revive linter
func (*noOpConntracker) DeleteTranslation(c network.ConnectionStats) {
func (*noOpConntracker) DeleteTranslation(c *network.ConnectionStats) {

}

Expand Down
58 changes: 25 additions & 33 deletions pkg/network/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ type State interface {
// RemoveConnections removes the given keys from the state
RemoveConnections(conns []*ConnectionStats)

// StoreClosedConnections stores a batch of closed connections
StoreClosedConnections(connections []ConnectionStats)
// StoreClosedConnection stores a batch of closed connections
StoreClosedConnection(connection *ConnectionStats)

// GetStats returns a map of statistics about the current network state
GetStats() map[string]interface{}
Expand Down Expand Up @@ -166,32 +166,32 @@ type closedConnections struct {
// All empty connections are placed at the end. If it is not empty, it will be placed
// at the index of the first empty connection, and the first empty connection will be placed at the end.
// If there are no empty connections, it will be appended at the end.
func (cc *closedConnections) insert(c ConnectionStats, maxClosedConns uint32) {
func (cc *closedConnections) insert(c *ConnectionStats, maxClosedConns uint32) {
// If we have reached the limit, drop an empty connection
if uint32(len(cc.conns)) >= maxClosedConns {
stateTelemetry.closedConnDropped.IncWithTags(c.Type.Tags())
cc.dropEmpty(c)
return
}
// If the connection is empty append at the end
if isEmpty(c) {
cc.conns = append(cc.conns, c)
if c.IsEmpty() {
cc.conns = append(cc.conns, *c)
cc.byCookie[c.Cookie] = len(cc.conns) - 1
return
}

// Insert the connection before empty connections
if cc.emptyStart < len(cc.conns) {
emptyConn := cc.conns[cc.emptyStart]
cc.conns[cc.emptyStart] = c
cc.conns[cc.emptyStart] = *c
cc.conns = append(cc.conns, emptyConn)
cc.byCookie[c.Cookie] = cc.emptyStart
cc.byCookie[emptyConn.Cookie] = len(cc.conns) - 1
cc.emptyStart++
return
}
// If there are no empty connections, append at the end
cc.conns = append(cc.conns, c)
cc.conns = append(cc.conns, *c)
cc.byCookie[c.Cookie] = len(cc.conns) - 1
cc.emptyStart = len(cc.conns)
}
Expand All @@ -200,12 +200,12 @@ func (cc *closedConnections) insert(c ConnectionStats, maxClosedConns uint32) {
// This method drops the incoming connection if it's empty or there are no empty connections in conns.
// If neither of these conditions are true, it will drop the first empty connection and replace it with
// the incoming connection.
func (cc *closedConnections) dropEmpty(c ConnectionStats) {
if isEmpty(c) || cc.emptyStart == len(cc.conns) {
func (cc *closedConnections) dropEmpty(c *ConnectionStats) {
if c.IsEmpty() || cc.emptyStart == len(cc.conns) {
return
}
delete(cc.byCookie, cc.conns[cc.emptyStart].Cookie)
cc.conns[cc.emptyStart] = c
cc.conns[cc.emptyStart] = *c
cc.byCookie[c.Cookie] = cc.emptyStart
cc.emptyStart++
}
Expand All @@ -216,25 +216,25 @@ func (cc *closedConnections) dropEmpty(c ConnectionStats) {
// Otherwise it checks if the connection at i is empty and will be replaced with a non-empty conn.
// If this is true, it will replace the connection and move it to where the first empty conn is.
// If there isn't a change of state (both are empty or non-empty) it will simply replace the conn.
func (cc *closedConnections) replaceAt(i int, c ConnectionStats) {
func (cc *closedConnections) replaceAt(i int, c *ConnectionStats) {
// pick the latest one
if c.LastUpdateEpoch <= cc.conns[i].LastUpdateEpoch {
return
}
// If c is empty and connn[i] is not, do not replace
if isEmpty(c) && i < cc.emptyStart {
if c.IsEmpty() && i < cc.emptyStart {
return
}
// If conn[i] is empty and c is not, replace with the first empty connection
if !isEmpty(c) && i >= cc.emptyStart {
if !c.IsEmpty() && i >= cc.emptyStart {
cc.conns[cc.emptyStart], cc.conns[i] = cc.conns[i], cc.conns[cc.emptyStart]
cc.byCookie[cc.conns[i].Cookie] = i
cc.conns[cc.emptyStart] = c
cc.conns[cc.emptyStart] = *c
cc.byCookie[c.Cookie] = cc.emptyStart
cc.emptyStart++
return
}
cc.conns[i] = c
cc.conns[i] = *c
}

type client struct {
Expand Down Expand Up @@ -600,27 +600,25 @@ func (ns *networkState) mergeByCookie(conns []ConnectionStats) ([]ConnectionStat
return conns, connsByKey
}

// StoreClosedConnections wraps the unexported method while locking state
func (ns *networkState) StoreClosedConnections(closed []ConnectionStats) {
// StoreClosedConnection wraps the unexported method while locking state
func (ns *networkState) StoreClosedConnection(closed *ConnectionStats) {
ns.Lock()
defer ns.Unlock()

ns.storeClosedConnections(closed)
ns.storeClosedConnection(closed)
}

// storeClosedConnection stores the given connection for every client
func (ns *networkState) storeClosedConnections(conns []ConnectionStats) {
func (ns *networkState) storeClosedConnection(c *ConnectionStats) {
for _, client := range ns.clients {
for _, c := range conns {
if i, ok := client.closed.byCookie[c.Cookie]; ok {
if ns.mergeConnectionStats(&client.closed.conns[i], &c) {
stateTelemetry.statsCookieCollisions.Inc()
client.closed.replaceAt(i, c)
}
continue
if i, ok := client.closed.byCookie[c.Cookie]; ok {
if ns.mergeConnectionStats(&client.closed.conns[i], c) {
stateTelemetry.statsCookieCollisions.Inc()
client.closed.replaceAt(i, c)
}
client.closed.insert(c, ns.maxClosedConns)
continue
}
client.closed.insert(c, ns.maxClosedConns)
}
}

Expand Down Expand Up @@ -1496,9 +1494,3 @@ func (ns *networkState) mergeConnectionStats(a, b *ConnectionStats) (collision b

return false
}

func isEmpty(conn ConnectionStats) bool {
return conn.Monotonic.RecvBytes == 0 && conn.Monotonic.RecvPackets == 0 &&
conn.Monotonic.SentBytes == 0 && conn.Monotonic.SentPackets == 0 &&
conn.Monotonic.Retransmits == 0 && len(conn.TCPFailures) == 0
}
Loading
Loading