From deb454eacf6c6b0dee49e7cdbfeb91236f462951 Mon Sep 17 00:00:00 2001 From: liguozhuang Date: Fri, 18 Oct 2024 18:26:12 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=97=A5=E5=BF=97=E9=87=87?= =?UTF-8?q?=E9=9B=86=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/logtail/textparser/dockerjson.go | 3 ++- internal/logtail/textparser/message_test.go | 9 +++++++++ .../plugins/inputs/container/container_log.go | 9 +++++++++ internal/tailer/metrics.go | 16 ---------------- internal/tailer/tailer_single.go | 17 ++++------------- 5 files changed, 24 insertions(+), 30 deletions(-) diff --git a/internal/logtail/textparser/dockerjson.go b/internal/logtail/textparser/dockerjson.go index f5c7c63e85..e8204a8a66 100644 --- a/internal/logtail/textparser/dockerjson.go +++ b/internal/logtail/textparser/dockerjson.go @@ -6,8 +6,9 @@ package textparser import ( - "encoding/json" "fmt" + + "github.com/goccy/go-json" ) type dockerjson struct { diff --git a/internal/logtail/textparser/message_test.go b/internal/logtail/textparser/message_test.go index b898093ffc..48545435ca 100644 --- a/internal/logtail/textparser/message_test.go +++ b/internal/logtail/textparser/message_test.go @@ -11,6 +11,15 @@ import ( "github.com/stretchr/testify/assert" ) +func BenchmarkParseDockerJsonLog(b *testing.B) { + data := []byte(`{"log":"[INFO] test log, is partial","stream":"stdout","time":"2024-04-28T03:22:20.055429751Z"}`) + + for i := 0; i < b.N; i++ { + msg := new(LogMessage) + _ = ParseDockerJSONLog(data, msg) + } +} + func TestParseDockerJsonLog(t *testing.T) { cases := []struct { in string diff --git a/internal/plugins/inputs/container/container_log.go b/internal/plugins/inputs/container/container_log.go index 93d2ef7345..f768f90ad9 100644 --- a/internal/plugins/inputs/container/container_log.go +++ b/internal/plugins/inputs/container/container_log.go @@ -8,14 +8,18 @@ package container import ( "context" "fmt" + "time" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/container/runtime" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/goroutine" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/logtail/fileprovider" + "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/logtail/openfile" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/tailer" ) +const defaultActiveDuration = time.Hour * 3 + func (c *container) cleanMissingContainerLog(newIDs []string) { missingIDs := c.logTable.findDifferences(newIDs) for _, id := range missingIDs { @@ -53,6 +57,7 @@ func (c *container) tailingLogs(ins *logInstance) { tailer.WithRemoveAnsiEscapeCodes(cfg.RemoveAnsiEscapeCodes || c.ipt.LoggingRemoveAnsiEscapeCodes), tailer.WithMaxForceFlushLimit(c.ipt.LoggingForceFlushLimit), tailer.WithFileFromBeginningThresholdSize(int64(c.ipt.LoggingFileFromBeginningThresholdSize)), + tailer.WithIgnoreDeadLog(defaultActiveDuration), tailer.WithDone(done), } @@ -83,6 +88,10 @@ func (c *container) tailingLogs(ins *logInstance) { continue } + if !openfile.FileIsActive(file, defaultActiveDuration) { + continue + } + l.Infof("add container log collection with path %s from source %s", file, cfg.Source) newOpts := opts diff --git a/internal/tailer/metrics.go b/internal/tailer/metrics.go index 1f7a894651..9cc19f8203 100644 --- a/internal/tailer/metrics.go +++ b/internal/tailer/metrics.go @@ -11,7 +11,6 @@ import ( ) var ( - multilineVec *prometheus.CounterVec rotateVec *prometheus.CounterVec forceFlushVec *prometheus.CounterVec parseFailVec *prometheus.CounterVec @@ -22,20 +21,6 @@ var ( ) func setupMetrics() { - multilineVec = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "datakit", - Subsystem: "tailer", - Name: "collect_multiline_state_total", - Help: "Tailer multiline state total", - }, - []string{ - "source", - "filepath", - "multilinestate", - }, - ) - rotateVec = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "datakit", @@ -122,7 +107,6 @@ func setupMetrics() { []string{"network"}) metrics.MustRegister( - multilineVec, openfileVec, ) diff --git a/internal/tailer/tailer_single.go b/internal/tailer/tailer_single.go index 8535197899..2b0b94b94c 100644 --- a/internal/tailer/tailer_single.go +++ b/internal/tailer/tailer_single.go @@ -122,7 +122,7 @@ func (t *Single) Run() { } func (t *Single) Close() { - t.recordingCache(true /*the last recording*/) + t.recordPosition() t.closeFile() openfileVec.WithLabelValues(t.opt.mode.String()).Dec() @@ -183,21 +183,14 @@ func (t *Single) seekOffset() error { return err } -func (t *Single) recordingCache(last bool) { +func (t *Single) recordPosition() { if t.offset <= 0 { return } c := &recorder.MetaData{Source: t.opt.source, Offset: t.offset} - var err error - if last { - err = recorder.SetAndFlush(t.recordKey, c) - } else { - err = recorder.Set(t.recordKey, c) - } - - if err != nil { + if err := recorder.SetAndFlush(t.recordKey, c); err != nil { t.log.Debugf("recording cache %s err: %s", c, err) } } @@ -403,7 +396,6 @@ func (t *Single) feed(pending [][]byte) { t.feedToRemote(pending) return } - defer t.recordingCache(false) t.feedToIO(pending) } @@ -505,8 +497,7 @@ func (t *Single) multiline(text []byte) []byte { if t.mult == nil { return text } - res, state := t.mult.ProcessLine(text) - multilineVec.WithLabelValues(t.opt.source, t.filepath, state.String()).Inc() + res, _ := t.mult.ProcessLine(text) return res }