From cf6b53040aa67b851f34839791480c01139ef3f8 Mon Sep 17 00:00:00 2001 From: RJ Kanson <9116105+rkanson@users.noreply.github.com> Date: Tue, 16 Jul 2024 15:11:33 -0400 Subject: [PATCH 1/7] [minor] implement large message handling --- client.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/client.go b/client.go index e111db0..a7b77ac 100644 --- a/client.go +++ b/client.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "syscall" "time" + "os" "github.com/pantheon-systems/pauditd/pkg/slog" ) @@ -120,6 +121,26 @@ func (n *NetlinkClient) Send(np *NetlinkPacket, a *AuditStatusPayload) error { // Receive will receive a packet from a netlink socket func (n *NetlinkClient) Receive() (*syscall.NetlinkMessage, error) { + // Large message handling + // See https://mdlayher.com/blog/linux-netlink-and-go-part-1-netlink/ + b := make([]byte, os.Getpagesize()) + for { + // Peek at the buffer to see how many bytes are available. + n, _, err := syscall.Recvfrom(n.fd, n.buf, syscall.MSG_PEEK) + if err != nil { + return nil, err + } + + // Break when we can read all messages. + if n < len(b) { + break + } + + // Double in size if not enough bytes. + b = make([]byte, len(b)*2) + } + + // Read out all available messages. nlen, _, err := syscall.Recvfrom(n.fd, n.buf, 0) if err != nil { return nil, err From a05557409cdeef7938d30fad0bbb9db10242be3e Mon Sep 17 00:00:00 2001 From: RJ Kanson <9116105+rkanson@users.noreply.github.com> Date: Tue, 16 Jul 2024 15:23:52 -0400 Subject: [PATCH 2/7] handle messages in a goroutine --- audit.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/audit.go b/audit.go index e16acf2..9ae5190 100644 --- a/audit.go +++ b/audit.go @@ -8,6 +8,7 @@ import ( "os" "os/exec" "strings" + "syscall" "github.com/pantheon-systems/pauditd/pkg/marshaller" "github.com/pantheon-systems/pauditd/pkg/metric" @@ -201,7 +202,6 @@ func main() { //Main loop. Get data from netlink and send it to the json lib for processing for { msg, err := nlClient.Receive() - timing := metric.GetClient().NewTiming() // measure latency from recipt of message if err != nil { if err.Error() == "no buffer space available" { metric.GetClient().Increment("messages.netlink_dropped") @@ -209,13 +209,18 @@ func main() { slog.Error.Printf("Error during message receive: %+v\n", err) continue } - - metric.GetClient().Increment("messages.total") if msg == nil { continue } - - marshaller.Consume(msg) - timing.Send("latency") + // As soon as we have a message, spawn a goroutine to handle it and free up the main loop + go handleMsg(msg, marshaller) } } + +func handleMsg(msg *syscall.NetlinkMessage, marshaller *marshaller.AuditMarshaller) { + timing := metric.GetClient().NewTiming() // measure latency from recipt of message + metric.GetClient().Increment("messages.total") + + marshaller.Consume(msg) + timing.Send("latency") +} From 9d3ca1ff03e07c2d98019871aab2deae7bbc68dd Mon Sep 17 00:00:00 2001 From: RJ Kanson <9116105+rkanson@users.noreply.github.com> Date: Tue, 16 Jul 2024 15:59:55 -0400 Subject: [PATCH 3/7] fetch the rmem_max value to not need a hardcoded value in a config --- audit.go | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/audit.go b/audit.go index 9ae5190..14bdfdf 100644 --- a/audit.go +++ b/audit.go @@ -176,7 +176,29 @@ func main() { slog.Error.Fatal(err) } - nlClient, err := NewNetlinkClient(config.GetInt("socket_buffer.receive")) + recvSize := 0 + // Fetch the max value we can set from /proc/sys/net/core/rmem_max + // This value is mounted in from the host via the kube yaml + // Open the file + file, err := os.Open("/proc/sys/net/core/rmem_max") + if err != nil { + slog.Error.Fatal(fmt.Sprintf("Error opening rmem_max: [%v]", err)) + } + defer file.Close() + // Read the value + var rmemMax int + _, err = fmt.Fscanf(file, "%d", &rmemMax) + if err != nil { + slog.Error.Fatal(fmt.Sprintf("Error reading the rmem_max value: [%v]", err)) + } + // If the value is 0, use the default value from the config + recvSize = rmemMax + if rmemMax == 0 { + recvSize = config.GetInt("socket_buffer.receive") + } + slog.Info.Printf("Setting the receive buffer size to %d\n", recvSize) + + nlClient, err := NewNetlinkClient(recvSize) if err != nil { slog.Error.Fatal(err) } From 28794c05b832377f8d7c9e00c67b63358c1ccf1b Mon Sep 17 00:00:00 2001 From: RJ Kanson <9116105+rkanson@users.noreply.github.com> Date: Tue, 16 Jul 2024 16:23:24 -0400 Subject: [PATCH 4/7] extract to new func + dont use fatal --- audit.go | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/audit.go b/audit.go index 14bdfdf..47dee1d 100644 --- a/audit.go +++ b/audit.go @@ -177,20 +177,7 @@ func main() { } recvSize := 0 - // Fetch the max value we can set from /proc/sys/net/core/rmem_max - // This value is mounted in from the host via the kube yaml - // Open the file - file, err := os.Open("/proc/sys/net/core/rmem_max") - if err != nil { - slog.Error.Fatal(fmt.Sprintf("Error opening rmem_max: [%v]", err)) - } - defer file.Close() - // Read the value - var rmemMax int - _, err = fmt.Fscanf(file, "%d", &rmemMax) - if err != nil { - slog.Error.Fatal(fmt.Sprintf("Error reading the rmem_max value: [%v]", err)) - } + rmemMax := fetchRmemMax() // If the value is 0, use the default value from the config recvSize = rmemMax if rmemMax == 0 { @@ -239,6 +226,23 @@ func main() { } } +// Fetch the max value we can set from /proc/sys/net/core/rmem_max +// This value is mounted in from the host via the kube yaml +func fetchRmemMax() int { + var rmemMax int + file, err := os.Open("/proc/sys/net/core/rmem_max") + if err != nil { + slog.Error.Println(fmt.Sprintf("Error opening rmem_max: [%v]", err)) + } + defer file.Close() + + _, err = fmt.Fscanf(file, "%d", &rmemMax) + if err != nil { + slog.Error.Println(fmt.Sprintf("Error reading the rmem_max value: [%v]", err)) + } + return rmemMax +} + func handleMsg(msg *syscall.NetlinkMessage, marshaller *marshaller.AuditMarshaller) { timing := metric.GetClient().NewTiming() // measure latency from recipt of message metric.GetClient().Increment("messages.total") From 7acf9a67adc4b8e56d6be2ab4c1f95fd38f72606 Mon Sep 17 00:00:00 2001 From: RJ Kanson <9116105+rkanson@users.noreply.github.com> Date: Tue, 16 Jul 2024 16:35:39 -0400 Subject: [PATCH 5/7] create and use a local buf for each receive call --- client.go | 43 +++++++++++++++++++++---------------------- client_test.go | 2 -- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/client.go b/client.go index a7b77ac..189bcb6 100644 --- a/client.go +++ b/client.go @@ -8,7 +8,6 @@ import ( "sync/atomic" "syscall" "time" - "os" "github.com/pantheon-systems/pauditd/pkg/slog" ) @@ -21,7 +20,7 @@ const ( MAX_AUDIT_MESSAGE_LENGTH = 8970 ) -//TODO: this should live in a marshaller +// TODO: this should live in a marshaller type AuditStatusPayload struct { Mask uint32 Enabled uint32 @@ -56,7 +55,6 @@ func NewNetlinkClient(recvSize int) (*NetlinkClient, error) { n := &NetlinkClient{ fd: fd, address: &syscall.SockaddrNetlink{Family: syscall.AF_NETLINK, Groups: 0, Pid: 0}, - buf: make([]byte, MAX_AUDIT_MESSAGE_LENGTH), cancelKeepConnection: make(chan struct{}), } @@ -123,25 +121,26 @@ func (n *NetlinkClient) Send(np *NetlinkPacket, a *AuditStatusPayload) error { func (n *NetlinkClient) Receive() (*syscall.NetlinkMessage, error) { // Large message handling // See https://mdlayher.com/blog/linux-netlink-and-go-part-1-netlink/ - b := make([]byte, os.Getpagesize()) + // Use a new buffer every time since this is spawned in a goroutine + buf := make([]byte, MAX_AUDIT_MESSAGE_LENGTH) for { - // Peek at the buffer to see how many bytes are available. - n, _, err := syscall.Recvfrom(n.fd, n.buf, syscall.MSG_PEEK) - if err != nil { - return nil, err - } + // Peek at the buffer to see how many bytes are available. + b, _, err := syscall.Recvfrom(n.fd, buf, syscall.MSG_PEEK) + if err != nil { + return nil, err + } - // Break when we can read all messages. - if n < len(b) { - break - } + // Break when we can read all messages. + if b < len(buf) { + break + } - // Double in size if not enough bytes. - b = make([]byte, len(b)*2) + // Double in size if not enough bytes. + buf = make([]byte, len(buf)*2) } // Read out all available messages. - nlen, _, err := syscall.Recvfrom(n.fd, n.buf, 0) + nlen, _, err := syscall.Recvfrom(n.fd, buf, 0) if err != nil { return nil, err } @@ -152,13 +151,13 @@ func (n *NetlinkClient) Receive() (*syscall.NetlinkMessage, error) { msg := &syscall.NetlinkMessage{ Header: syscall.NlMsghdr{ - Len: Endianness.Uint32(n.buf[0:4]), - Type: Endianness.Uint16(n.buf[4:6]), - Flags: Endianness.Uint16(n.buf[6:8]), - Seq: Endianness.Uint32(n.buf[8:12]), - Pid: Endianness.Uint32(n.buf[12:16]), + Len: Endianness.Uint32(buf[0:4]), + Type: Endianness.Uint16(buf[4:6]), + Flags: Endianness.Uint16(buf[6:8]), + Seq: Endianness.Uint32(buf[8:12]), + Pid: Endianness.Uint32(buf[12:16]), }, - Data: n.buf[syscall.SizeofNlMsghdr:nlen], + Data: buf[syscall.SizeofNlMsghdr:nlen], } return msg, nil diff --git a/client_test.go b/client_test.go index 689bed6..e084412 100644 --- a/client_test.go +++ b/client_test.go @@ -103,8 +103,6 @@ func TestNewNetlinkClient(t *testing.T) { assert.True(t, (n.fd > 0), "No file descriptor") assert.True(t, (n.address != nil), "Address was nil") assert.Equal(t, uint32(0), n.seq, "Seq should start at 0") - assert.True(t, MAX_AUDIT_MESSAGE_LENGTH >= len(n.buf), "Client buffer is too small") - assert.Equal(t, "Socket receive buffer size: ", lb.String()[:28], "Expected some nice log lines") assert.Equal(t, "", elb.String(), "Did not expect any error messages") } From 7a8921ba30f3ad132a4d90e6de83c2cba1e43075 Mon Sep 17 00:00:00 2001 From: RJ Kanson <9116105+rkanson@users.noreply.github.com> Date: Wed, 17 Jul 2024 13:32:41 -0400 Subject: [PATCH 6/7] handle a panic in the goroutine --- audit.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/audit.go b/audit.go index 47dee1d..006317d 100644 --- a/audit.go +++ b/audit.go @@ -244,6 +244,12 @@ func fetchRmemMax() int { } func handleMsg(msg *syscall.NetlinkMessage, marshaller *marshaller.AuditMarshaller) { + defer func() { + if r := recover(); r != nil { + slog.Error.Printf("Panic occurred in handleMsg: %v", r) + } + }() + timing := metric.GetClient().NewTiming() // measure latency from recipt of message metric.GetClient().Increment("messages.total") From 04c747b6f7a6b601515636da1a57b0bf19b35673 Mon Sep 17 00:00:00 2001 From: RJ Kanson <9116105+rkanson@users.noreply.github.com> Date: Fri, 19 Jul 2024 16:08:28 -0400 Subject: [PATCH 7/7] build-release for all PRs --- .circleci/config.yml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 8d55d8e..59ef17e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -52,7 +52,6 @@ jobs: - run: package_cloud push pantheon/public/fedora/29 ./dist/*.rpm - run: package_cloud push pantheon/public/el/7 ./dist/*.rpm - workflows: version: 2 test-build-release: @@ -66,10 +65,6 @@ workflows: - sig-go-release requires: - test - filters: - branches: - only: - - master - publish-rpm: requires: - build-release @@ -77,4 +72,3 @@ workflows: branches: only: - master -