Skip to content

Commit

Permalink
Lots more config and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
nbrownus committed Apr 14, 2016
1 parent 47247f8 commit faa6bc0
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 31 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ canary: true
# Configure socket buffers, leave unset to use the system defaults
# Values will be doubled by the kernel
# It is recommended you do not set any of these values unless you really need to
socket_buffer:
# Default is net.core.rmem_default (/proc/sys/net/core/rmem_default)
# Maximum max is net.core.rmem_max (/proc/sys/net/core/rmem_max)
Expand All @@ -49,6 +50,9 @@ message_tracking:
# Log out of orderness, these messages typically signify an overloading system, default false
log_out_of_order: false
# Maximum out of orderness before a missed sequence is presumed dropped, default 500
max_out_of_order: 500
rules:
# Watch all 64 bit program executions
Expand Down
64 changes: 44 additions & 20 deletions audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import (
)

func loadConfig(configLocation string) {
viper.SetDefault("canary", true)
viper.SetDefault("message_tracking.enabled", true)
viper.SetDefault("message_tracking.log_out_of_order", false)
viper.SetDefault("message_tracking.max_out_of_order", 500)

if configLocation == "" {
viper.SetConfigName("go-audit")
viper.AddConfigPath("/etc/audit")
Expand All @@ -32,41 +37,51 @@ func loadConfig(configLocation string) {
fmt.Println("Using config from", viper.ConfigFileUsed())
}

func main() {
configFile := flag.String("config", "", "Config file location, default /etc/audit/go-audit.yaml")
cpuProfile := flag.Bool("cpuprofile", false, "Enable cpu profiling")

flag.Parse()

loadConfig(*configFile)

if viper.GetBool("canary") {
go canaryRead()
}

func setRules() {
// Clear existing rules
err := exec.Command("auditctl", "-D").Run()
if err != nil {
fmt.Printf("Failed to flush existing audit rules. Error: %s\n", err)
os.Exit(1)
}

fmt.Println("Flushed existing rules")
fmt.Println("Flushed existing audit rules")

// Add ours in
if rules := viper.GetStringSlice("rules"); len(rules) != 0 {
for i, v := range rules {
// Skip rules with no content
if v == "" {
continue
}

err := exec.Command("auditctl", strings.Fields(v)...).Run()
if err != nil {
fmt.Printf("Failed to add rule #%d. Error: %s\n", i + 1, err)
fmt.Printf("Failed to add rule #%d. Error: %s \n", i + 1, err)
os.Exit(1)
}

fmt.Printf("Added rule #%d\n", i + 1)
fmt.Printf("Added audit rule #%d\n", i + 1)
}
} else {
fmt.Println("No rules found. Running with existing ruleset (may be empty!)")
fmt.Println("No audit rules found. exiting")
os.Exit(1)
}
}

func main() {
configFile := flag.String("config", "", "Config file location, default /etc/audit/go-audit.yaml")
cpuProfile := flag.Bool("cpuprofile", false, "Enable cpu profiling")

flag.Parse()

loadConfig(*configFile)

if viper.GetBool("canary") {
go canaryRead()
}

setRules()

if *cpuProfile {
fmt.Println("Enabling CPU profile ./cpu.pprof")
Expand All @@ -75,16 +90,25 @@ func main() {

//TODO: syslogWriter should be configurable
syslogWriter, _ := syslog.Dial("", "", syslog.LOG_LOCAL0 | syslog.LOG_WARNING, "go-audit")
nlClient := NewNetlinkClient()
marshaller := NewAuditMarshaller(syslogWriter)
nlClient := NewNetlinkClient(viper.GetInt("socket_buffer.receive"))
marshaller := NewAuditMarshaller(
syslogWriter,
viper.GetBool("message_tracking.enabled"),
viper.GetBool("message_tracking.log_out_of_order"),
viper.GetInt("message_tracking.max_out_of_order"),
)

fmt.Println("Starting")
fmt.Println("Starting to process events")

//Main loop. Get data from netlink and send it to the json lib for processing
for {
msg, err := nlClient.Receive()
if err != nil {
fmt.Println("Error during message receive:", err)
fmt.Printf("Error during message receive: %+v\n", err)
continue
}

if msg == nil {
continue
}

Expand Down
18 changes: 17 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type NetlinkClient struct {
buf []byte
}

func NewNetlinkClient() (*NetlinkClient) {
func NewNetlinkClient(recvSize int) (*NetlinkClient) {
fd, err := syscall.Socket(syscall.AF_NETLINK, syscall.SOCK_RAW, syscall.NETLINK_AUDIT)
if err != nil {
log.Fatal("Could not create a socket:", err)
Expand All @@ -59,6 +59,16 @@ func NewNetlinkClient() (*NetlinkClient) {
log.Fatal("Could not bind to netlink socket:", err)
}

// Set the buffer size if we were asked
if (recvSize > 0) {
err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_RCVBUF, recvSize)
}

// Print the current receive buffer size
if v, err := syscall.GetsockoptInt(n.fd, syscall.SOL_SOCKET, syscall.SO_RCVBUF); err == nil {
fmt.Println("Socket receive buffer size:", v)
}

go n.KeepConnection()

return n
Expand Down Expand Up @@ -93,6 +103,12 @@ func (n *NetlinkClient) Receive() (*syscall.NetlinkMessage, error) {
}

return msg, nil

//t := []byte{}
//_ = errors.New("nope")
//nlen, oobn, flags, _, err := syscall.Recvmsg(n.fd, n.buf, t, 0)
//fmt.Printf("nlen %+v, oobn %+v, flags %+v, err %+v\n", nlen, oobn, flags, err)
//return nil, nil
}

func (n *NetlinkClient) KeepConnection() {
Expand Down
55 changes: 45 additions & 10 deletions marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,22 @@ type AuditMarshaller struct {
msgs map[int]*AuditMessageGroup
encoder *json.Encoder
lastSeq int
missed map[int]bool
worstLag int
trackMessages bool
logOutOfOrder bool
maxOutOfOrder int
}

// Create a new marshaller
func NewAuditMarshaller(w io.Writer) (*AuditMarshaller){
func NewAuditMarshaller(w io.Writer, trackMessages, logOOO bool, maxOOO int) (*AuditMarshaller){
return &AuditMarshaller{
encoder: json.NewEncoder(w),
msgs: make(map[int]*AuditMessageGroup, 5), // It is not typical to have more than 2 messagee groups at any given time
msgs: make(map[int]*AuditMessageGroup, 5), // It is not typical to have more than 2 message groups at any given time
missed: make(map[int]bool, 10),
trackMessages: trackMessages,
logOutOfOrder: logOOO,
maxOutOfOrder: maxOOO,
}
}

Expand All @@ -40,14 +49,8 @@ func (a *AuditMarshaller) Consume(nlMsg *syscall.NetlinkMessage) {
return
}

if aMsg.Seq > a.lastSeq + 1 && a.lastSeq != 0 {
// Detect if we lost any messages
fmt.Printf("Likely missed a packet, last seen: %d; current %d;\n", a.lastSeq, aMsg.Seq)
}

if aMsg.Seq > a.lastSeq {
// Keep track of the largest sequence
a.lastSeq = aMsg.Seq
if (a.trackMessages) {
a.detectMissing(aMsg.Seq)
}

if (nlMsg.Header.Type < EVENT_START || nlMsg.Header.Type > EVENT_END) {
Expand Down Expand Up @@ -98,3 +101,35 @@ func (a *AuditMarshaller) completeMessage(seq int) {

delete(a.msgs, seq)
}

// Track sequence numbers and log if we suspect we missed a message
func (a *AuditMarshaller) detectMissing(seq int) {
if seq > a.lastSeq + 1 && a.lastSeq != 0 {
// We likely leap frogged over a msg, wait until the next sequence to make sure
for i := a.lastSeq + 1; i < seq; i++ {
a.missed[i] = true
}
}

for missedSeq, _ := range a.missed {
if missedSeq == seq {
lag := a.lastSeq - missedSeq
if lag > a.worstLag {
a.worstLag = lag
}

if (a.logOutOfOrder) {
fmt.Println("Got sequence", missedSeq, "after", lag, "messages. Worst lag so far", a.worstLag, "messages")
}
delete(a.missed, missedSeq)
} else if seq - missedSeq > a.maxOutOfOrder {
fmt.Printf("Likely missed sequence %d, current %d, worst message delay %d\n", missedSeq, seq, a.worstLag)
delete(a.missed, missedSeq)
}
}

if seq > a.lastSeq {
// Keep track of the largest sequence
a.lastSeq = seq
}
}

0 comments on commit faa6bc0

Please sign in to comment.