Skip to content

Commit

Permalink
Merge branch 'iss2425-update-logging' into 'dev'
Browse files Browse the repository at this point in the history
优化日志采集性能

See merge request cloudcare-tools/datakit!3239
  • Loading branch information
谭彪 committed Oct 18, 2024
2 parents e5258bd + deb454e commit e8e6620
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 30 deletions.
3 changes: 2 additions & 1 deletion internal/logtail/textparser/dockerjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
package textparser

import (
"encoding/json"
"fmt"

"github.com/goccy/go-json"
)

type dockerjson struct {
Expand Down
9 changes: 9 additions & 0 deletions internal/logtail/textparser/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ import (
"github.com/stretchr/testify/assert"
)

func BenchmarkParseDockerJsonLog(b *testing.B) {
data := []byte(`{"log":"[INFO] test log, is partial","stream":"stdout","time":"2024-04-28T03:22:20.055429751Z"}`)

for i := 0; i < b.N; i++ {
msg := new(LogMessage)
_ = ParseDockerJSONLog(data, msg)
}
}

func TestParseDockerJsonLog(t *testing.T) {
cases := []struct {
in string
Expand Down
9 changes: 9 additions & 0 deletions internal/plugins/inputs/container/container_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@ package container
import (
"context"
"fmt"
"time"

"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/container/runtime"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/goroutine"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/logtail/fileprovider"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/logtail/openfile"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/tailer"
)

const defaultActiveDuration = time.Hour * 3

func (c *container) cleanMissingContainerLog(newIDs []string) {
missingIDs := c.logTable.findDifferences(newIDs)
for _, id := range missingIDs {
Expand Down Expand Up @@ -53,6 +57,7 @@ func (c *container) tailingLogs(ins *logInstance) {
tailer.WithRemoveAnsiEscapeCodes(cfg.RemoveAnsiEscapeCodes || c.ipt.LoggingRemoveAnsiEscapeCodes),
tailer.WithMaxForceFlushLimit(c.ipt.LoggingForceFlushLimit),
tailer.WithFileFromBeginningThresholdSize(int64(c.ipt.LoggingFileFromBeginningThresholdSize)),
tailer.WithIgnoreDeadLog(defaultActiveDuration),
tailer.WithDone(done),
}

Expand Down Expand Up @@ -83,6 +88,10 @@ func (c *container) tailingLogs(ins *logInstance) {
continue
}

if !openfile.FileIsActive(file, defaultActiveDuration) {
continue
}

l.Infof("add container log collection with path %s from source %s", file, cfg.Source)

newOpts := opts
Expand Down
16 changes: 0 additions & 16 deletions internal/tailer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
)

var (
multilineVec *prometheus.CounterVec
rotateVec *prometheus.CounterVec
forceFlushVec *prometheus.CounterVec
parseFailVec *prometheus.CounterVec
Expand All @@ -22,20 +21,6 @@ var (
)

func setupMetrics() {
multilineVec = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "datakit",
Subsystem: "tailer",
Name: "collect_multiline_state_total",
Help: "Tailer multiline state total",
},
[]string{
"source",
"filepath",
"multilinestate",
},
)

rotateVec = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "datakit",
Expand Down Expand Up @@ -122,7 +107,6 @@ func setupMetrics() {
[]string{"network"})

metrics.MustRegister(
multilineVec,
openfileVec,
)

Expand Down
17 changes: 4 additions & 13 deletions internal/tailer/tailer_single.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (t *Single) Run() {
}

func (t *Single) Close() {
t.recordingCache(true /*the last recording*/)
t.recordPosition()
t.closeFile()

openfileVec.WithLabelValues(t.opt.mode.String()).Dec()
Expand Down Expand Up @@ -183,21 +183,14 @@ func (t *Single) seekOffset() error {
return err
}

func (t *Single) recordingCache(last bool) {
func (t *Single) recordPosition() {
if t.offset <= 0 {
return
}

c := &recorder.MetaData{Source: t.opt.source, Offset: t.offset}

var err error
if last {
err = recorder.SetAndFlush(t.recordKey, c)
} else {
err = recorder.Set(t.recordKey, c)
}

if err != nil {
if err := recorder.SetAndFlush(t.recordKey, c); err != nil {
t.log.Debugf("recording cache %s err: %s", c, err)
}
}
Expand Down Expand Up @@ -403,7 +396,6 @@ func (t *Single) feed(pending [][]byte) {
t.feedToRemote(pending)
return
}
defer t.recordingCache(false)
t.feedToIO(pending)
}

Expand Down Expand Up @@ -505,8 +497,7 @@ func (t *Single) multiline(text []byte) []byte {
if t.mult == nil {
return text
}
res, state := t.mult.ProcessLine(text)
multilineVec.WithLabelValues(t.opt.source, t.filepath, state.String()).Inc()
res, _ := t.mult.ProcessLine(text)
return res
}

Expand Down

0 comments on commit e8e6620

Please sign in to comment.