Skip to content

Commit

Permalink
V2 Auto Multiline Detection - Fix truncation + Clean up logs parser t…
Browse files Browse the repository at this point in the history
…ag handling (#28765)
  • Loading branch information
gh123man authored Aug 28, 2024
1 parent f8015be commit 2f3053a
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 61 deletions.
6 changes: 3 additions & 3 deletions pkg/config/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1502,6 +1502,9 @@ func logsagent(config pkgconfigmodel.Setup) {
config.BindEnvAndSetDefault("logs_config.auto_multi_line.tokenizer_max_input_bytes", 60)
config.BindEnvAndSetDefault("logs_config.auto_multi_line.pattern_table_max_size", 20)
config.BindEnvAndSetDefault("logs_config.auto_multi_line.pattern_table_match_threshold", 0.75)
config.BindEnvAndSetDefault("logs_config.tag_auto_multi_line_logs", false)
// Add a tag to logs that are truncated by the agent
config.BindEnvAndSetDefault("logs_config.tag_truncated_logs", false)

// If true, the agent looks for container logs in the location used by podman, rather
// than docker. This is a temporary configuration parameter to support podman logs until
Expand Down Expand Up @@ -1542,9 +1545,6 @@ func logsagent(config pkgconfigmodel.Setup) {

// Max size in MB to allow for integrations logs files
config.BindEnvAndSetDefault("logs_config.integrations_logs_files_max_size", 100)

// Add a tag to file logs that are truncated by the agent
config.BindEnvAndSetDefault("logs_config.tag_truncated_logs", false)
}

func vector(config pkgconfigmodel.Setup) {
Expand Down
49 changes: 37 additions & 12 deletions pkg/logs/internal/decoder/auto_multiline_detection/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,42 +14,66 @@ import (
)

type bucket struct {
message *message.Message
buffer *bytes.Buffer
lineCount int
tagTruncatedLogs bool
tagMultiLineLogs bool

message *message.Message
originalDataLen int
buffer *bytes.Buffer
lineCount int
truncated bool
}

func (b *bucket) add(msg *message.Message) {
if b.message == nil {
b.message = msg
}
if b.buffer.Len() > 0 {
if b.originalDataLen > 0 {
b.buffer.Write(message.EscapedLineFeed)
}
b.buffer.Write(msg.GetContent())
b.originalDataLen += msg.RawDataLen
b.lineCount++
}

func (b *bucket) isEmpty() bool {
return b.buffer.Len() == 0
return b.originalDataLen == 0
}

func (b *bucket) truncate() {
b.buffer.Write(message.TruncatedFlag)
b.truncated = true
}

func (b *bucket) flush() *message.Message {
defer func() {
b.buffer.Reset()
b.message = nil
b.lineCount = 0
b.originalDataLen = 0
b.truncated = false
}()

originalLen := b.buffer.Len()
data := bytes.TrimSpace(b.buffer.Bytes())
content := make([]byte, len(data))
copy(content, data)

msg := message.NewRawMessage(content, b.message.Status, b.originalDataLen, b.message.ParsingExtra.Timestamp)

if b.lineCount > 1 {
return message.NewRawMultiLineMessage(content, b.message.Status, originalLen, b.message.ParsingExtra.Timestamp)
msg.ParsingExtra.IsMultiLine = true
if b.tagMultiLineLogs {
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.AutoMultiLineTag)
}
}
return message.NewRawMessage(content, b.message.Status, originalLen, b.message.ParsingExtra.Timestamp)

if b.truncated {
msg.ParsingExtra.IsTruncated = true
if b.tagTruncatedLogs {
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.TruncatedTag)
}
}
return msg
}

// Aggregator aggregates multiline logs with a given label.
Expand All @@ -62,10 +86,10 @@ type Aggregator struct {
}

// NewAggregator creates a new aggregator.
func NewAggregator(outputFn func(m *message.Message), maxContentSize int, flushTimeout time.Duration) *Aggregator {
func NewAggregator(outputFn func(m *message.Message), maxContentSize int, flushTimeout time.Duration, tagTruncatedLogs bool, tagMultiLineLogs bool) *Aggregator {
return &Aggregator{
outputFn: outputFn,
bucket: &bucket{buffer: bytes.NewBuffer(nil)},
bucket: &bucket{buffer: bytes.NewBuffer(nil), tagTruncatedLogs: tagTruncatedLogs, tagMultiLineLogs: tagMultiLineLogs},
maxContentSize: maxContentSize,
flushTimeout: flushTimeout,
}
Expand Down Expand Up @@ -97,9 +121,10 @@ func (a *Aggregator) Aggregate(msg *message.Message, label Label) {

// At this point we either have `startGroup` with an empty bucket or `aggregate` with a non-empty bucket
// so we add the message to the bucket or flush if the bucket will overflow the max content size.

if msg.RawDataLen+a.bucket.buffer.Len() > a.maxContentSize {
a.bucket.flush()
a.bucket.truncate() // Truncate the end of the current bucket
a.Flush()
a.bucket.truncate() // Truncate the start of the next bucket
}

a.bucket.add(msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ func assertMessageContent(t *testing.T, m *message.Message, content string) {
}

func TestNoAggregate(t *testing.T) {

outputChan, outputFn := makeHandler()
ag := NewAggregator(outputFn, 100, time.Duration(1*time.Second))
ag := NewAggregator(outputFn, 100, time.Duration(1*time.Second), false, false)

ag.Aggregate(newMessage("1"), noAggregate)
ag.Aggregate(newMessage("2"), noAggregate)
Expand All @@ -50,7 +49,7 @@ func TestNoAggregate(t *testing.T) {
func TestNoAggregateEndsGroup(t *testing.T) {

outputChan, outputFn := makeHandler()
ag := NewAggregator(outputFn, 100, time.Duration(1*time.Second))
ag := NewAggregator(outputFn, 100, time.Duration(1*time.Second), false, false)

ag.Aggregate(newMessage("1"), startGroup)
ag.Aggregate(newMessage("2"), startGroup)
Expand All @@ -62,9 +61,8 @@ func TestNoAggregateEndsGroup(t *testing.T) {
}

func TestAggregateGroups(t *testing.T) {

outputChan, outputFn := makeHandler()
ag := NewAggregator(outputFn, 100, time.Duration(1*time.Second))
ag := NewAggregator(outputFn, 100, time.Duration(1*time.Second), false, false)

// Aggregated log
ag.Aggregate(newMessage("1"), startGroup)
Expand All @@ -83,9 +81,8 @@ func TestAggregateGroups(t *testing.T) {
}

func TestAggregateDoesntStartGroup(t *testing.T) {

outputChan, outputFn := makeHandler()
ag := NewAggregator(outputFn, 100, time.Duration(1*time.Second))
ag := NewAggregator(outputFn, 100, time.Duration(1*time.Second), false, false)

ag.Aggregate(newMessage("1"), aggregate)
ag.Aggregate(newMessage("2"), aggregate)
Expand All @@ -97,9 +94,8 @@ func TestAggregateDoesntStartGroup(t *testing.T) {
}

func TestForceFlush(t *testing.T) {

outputChan, outputFn := makeHandler()
ag := NewAggregator(outputFn, 100, time.Duration(1*time.Second))
ag := NewAggregator(outputFn, 100, time.Duration(1*time.Second), false, false)

ag.Aggregate(newMessage("1"), startGroup)
ag.Aggregate(newMessage("2"), aggregate)
Expand All @@ -110,9 +106,8 @@ func TestForceFlush(t *testing.T) {
}

func TestAggregationTimer(t *testing.T) {

outputChan, outputFn := makeHandler()
ag := NewAggregator(outputFn, 100, time.Duration(1*time.Second))
ag := NewAggregator(outputFn, 100, time.Duration(1*time.Second), false, false)

assert.Nil(t, ag.FlushChan())
ag.Aggregate(newMessage("1"), startGroup)
Expand All @@ -126,3 +121,55 @@ func TestAggregationTimer(t *testing.T) {
assertMessageContent(t, <-outputChan, "1")
assertMessageContent(t, <-outputChan, "2")
}

func TestTagTruncatedLogs(t *testing.T) {
outputChan, outputFn := makeHandler()
ag := NewAggregator(outputFn, 10, time.Duration(1*time.Second), true, false)

ag.Aggregate(newMessage("1234567890"), startGroup)
ag.Aggregate(newMessage("1"), aggregate) // Causes overflow, truncate and flush
ag.Aggregate(newMessage("2"), noAggregate)

msg := <-outputChan
assert.True(t, msg.ParsingExtra.IsTruncated)
assert.Equal(t, msg.ParsingExtra.Tags, []string{message.TruncatedTag})
assertMessageContent(t, msg, "1234567890...TRUNCATED...")

msg = <-outputChan
assert.True(t, msg.ParsingExtra.IsTruncated)
assert.Equal(t, msg.ParsingExtra.Tags, []string{message.TruncatedTag})
assertMessageContent(t, msg, "...TRUNCATED...1")

msg = <-outputChan
assert.False(t, msg.ParsingExtra.IsTruncated)
assert.Empty(t, msg.ParsingExtra.Tags)
assertMessageContent(t, msg, "2")
}

func TestTagMultiLineLogs(t *testing.T) {
outputChan, outputFn := makeHandler()
ag := NewAggregator(outputFn, 10, time.Duration(1*time.Second), false, true)

ag.Aggregate(newMessage("12345"), startGroup)
ag.Aggregate(newMessage("67890"), aggregate)
ag.Aggregate(newMessage("1"), aggregate) // Causes overflow, truncate and flush
ag.Aggregate(newMessage("2"), noAggregate)

msg := <-outputChan
assert.True(t, msg.ParsingExtra.IsMultiLine)
assert.True(t, msg.ParsingExtra.IsTruncated)
assert.Equal(t, msg.ParsingExtra.Tags, []string{message.AutoMultiLineTag})
assertMessageContent(t, msg, "12345\\n67890...TRUNCATED...")

msg = <-outputChan
assert.False(t, msg.ParsingExtra.IsMultiLine)
assert.True(t, msg.ParsingExtra.IsTruncated)
assert.Empty(t, msg.ParsingExtra.Tags)
assertMessageContent(t, msg, "...TRUNCATED...1")

msg = <-outputChan
assert.False(t, msg.ParsingExtra.IsMultiLine)
assert.False(t, msg.ParsingExtra.IsTruncated)
assert.Empty(t, msg.ParsingExtra.Tags)
assertMessageContent(t, msg, "2")
}
9 changes: 7 additions & 2 deletions pkg/logs/internal/decoder/auto_multiline_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,13 @@ func NewAutoMultilineHandler(outputFn func(m *message.Message), maxContentSize i
}

return &AutoMultilineHandler{
labeler: automultilinedetection.NewLabeler(heuristics),
aggregator: automultilinedetection.NewAggregator(outputFn, maxContentSize, flushTimeout),
labeler: automultilinedetection.NewLabeler(heuristics),
aggregator: automultilinedetection.NewAggregator(
outputFn,
maxContentSize,
flushTimeout,
config.Datadog().GetBool("logs_config.tag_truncated_logs"),
config.Datadog().GetBool("logs_config.tag_auto_multi_line_logs")),
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/logs/internal/decoder/multiline_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"regexp"
"time"

coreConfig "github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/logs/message"
status "github.com/DataDog/datadog-agent/pkg/logs/status/utils"
"github.com/DataDog/datadog-agent/pkg/telemetry"
Expand Down Expand Up @@ -162,6 +163,9 @@ func (h *MultiLineHandler) sendBuffer() {
}
msg := message.NewRawMessage(content, h.status, h.linesLen, h.timestamp)
msg.ParsingExtra.IsTruncated = h.isBufferTruncated
if h.isBufferTruncated && coreConfig.Datadog().GetBool("logs_config.tag_truncated_logs") {
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.TruncatedTag)
}
h.outputFn(msg)
}
}
11 changes: 9 additions & 2 deletions pkg/logs/internal/decoder/single_line_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"bytes"
"time"

coreConfig "github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/logs/message"
)

Expand Down Expand Up @@ -36,6 +37,12 @@ func (h *SingleLineHandler) flush() {
// do nothing
}

func addTruncatedTag(msg *message.Message) {
if coreConfig.Datadog().GetBool("logs_config.tag_truncated_logs") {
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.TruncatedTag)
}
}

// process transforms a raw line into a structured line,
// it guarantees that the content of the line won't exceed
// the limit and that the length of the line is properly tracked
Expand All @@ -52,7 +59,7 @@ func (h *SingleLineHandler) process(msg *message.Message) {
// the new line is just a remainder,
// adding the truncated flag at the beginning of the content
content = append(message.TruncatedFlag, content...)
msg.ParsingExtra.IsTruncated = true
addTruncatedTag(msg)
}

// how should we detect logs which are too long before rendering them?
Expand All @@ -64,7 +71,7 @@ func (h *SingleLineHandler) process(msg *message.Message) {
// adding the truncated flag the end of the content
content = append(content, message.TruncatedFlag...)
msg.SetContent(content) // refresh the content in the message
msg.ParsingExtra.IsTruncated = true
addTruncatedTag(msg)
h.outputFn(msg)
// make sure the following part of the line will be cut off as well
h.shouldTruncate = true
Expand Down
28 changes: 7 additions & 21 deletions pkg/logs/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ var TruncatedFlag = []byte("...TRUNCATED...")
// TruncatedTag is added to truncated log messages (if enabled).
const TruncatedTag = "truncated"

// AutoMultiLineTag is added to multiline log messages (if enabled).
const AutoMultiLineTag = "auto_multiline"

// EscapedLineFeed is used to escape new line character
// for multiline message.
// New line character needs to be escaped because they are used
Expand Down Expand Up @@ -49,8 +52,7 @@ type Message struct {
IngestionTimestamp int64
// RawDataLen tracks the original size of the message content before any trimming/transformation.
// This is used when calculating the tailer offset - so this will NOT always be equal to `len(Content)`.
RawDataLen int
IsMultiLine bool
RawDataLen int
// Tags added on processing
ProcessingTags []string
// Extra information from the parsers
Expand Down Expand Up @@ -172,6 +174,7 @@ type ParsingExtra struct {
Timestamp string
IsPartial bool
IsTruncated bool
IsMultiLine bool
Tags []string
}

Expand Down Expand Up @@ -222,26 +225,9 @@ func NewRawMessage(content []byte, status string, rawDataLen int, readTimestamp
RawDataLen: rawDataLen,
IngestionTimestamp: time.Now().UnixNano(),
ParsingExtra: ParsingExtra{
Timestamp: readTimestamp,
},
IsMultiLine: false,
}
}

// NewRawMultiLineMessage returns a new encoded message.
func NewRawMultiLineMessage(content []byte, status string, rawDataLen int, readTimestamp string) *Message {
return &Message{
MessageContent: MessageContent{
content: content,
State: StateUnstructured,
},
Status: status,
RawDataLen: rawDataLen,
IngestionTimestamp: time.Now().UnixNano(),
ParsingExtra: ParsingExtra{
Timestamp: readTimestamp,
Timestamp: readTimestamp,
IsMultiLine: false,
},
IsMultiLine: true,
}
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/logs/tailers/docker/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,10 @@ func (t *Tailer) forwardMessages() {
origin.Offset = output.ParsingExtra.Timestamp
t.setLastSince(output.ParsingExtra.Timestamp)
origin.Identifier = t.Identifier()
origin.SetTags(t.tagProvider.GetTags())
tags := []string{}
tags = append(tags, output.ParsingExtra.Tags...)
tags = append(tags, t.tagProvider.GetTags()...)
origin.SetTags(tags)
// XXX(remy): is it OK recreating a message here?
t.outputChan <- message.NewMessage(output.GetContent(), origin, output.Status, output.IngestionTimestamp)
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/logs/tailers/file/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,11 +348,6 @@ func (t *Tailer) forwardMessages() {
tags := make([]string, len(t.tags))
copy(tags, t.tags)
tags = append(tags, t.tagProvider.GetTags()...)

if output.ParsingExtra.IsTruncated && coreConfig.Datadog().GetBool("logs_config.tag_truncated_logs") {
tags = append(tags, message.TruncatedTag)
}

tags = append(tags, output.ParsingExtra.Tags...)
origin.SetTags(tags)
// Ignore empty lines once the registry offset is updated
Expand Down
Loading

0 comments on commit 2f3053a

Please sign in to comment.