diff --git a/internal/logtail/bytechannel/bytechannel.go b/internal/logtail/bytechannel/bytechannel.go new file mode 100644 index 0000000000..fb147344ea --- /dev/null +++ b/internal/logtail/bytechannel/bytechannel.go @@ -0,0 +1,122 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +// Package bytechannel provides a thread-safe channel for sending and receiving byte slices +package bytechannel + +import ( + "context" + "sync/atomic" + "time" +) + +type ByteChannel interface { + Send(context.Context, []byte) bool + SendSync(context.Context, []byte) + Receive() []byte + CurrentChannelSize() int64 + CurrentByteSize() int64 +} + +type option struct { + chanSize int + capacity int64 +} + +type Option func(*option) + +func WithChanSize(n int) Option { return func(opt *option) { opt.chanSize = n } } +func WithCapacity(n int64) Option { return func(opt *option) { opt.capacity = n } } + +func defaultOption() *option { + return &option{ + chanSize: 512, + capacity: 10 * 1024 * 1024, // 10MB + } +} + +type byteChannel struct { + ch chan []byte + capacity int64 + + currentChanSize *atomic.Int64 + currentSize *atomic.Int64 +} + +func NewByteChannel(opts ...Option) ByteChannel { + c := defaultOption() + for _, opt := range opts { + opt(c) + } + + return &byteChannel{ + ch: make(chan []byte, c.chanSize), + capacity: c.capacity, + currentChanSize: &atomic.Int64{}, + currentSize: &atomic.Int64{}, + } +} + +func (bc *byteChannel) Send(ctx context.Context, data []byte) bool { + if bc.currentSize.Load()+int64(len(data)) > bc.capacity { + return false + } + + select { + case <-ctx.Done(): + return false + case bc.ch <- data: + bc.currentChanSize.Add(1) + bc.currentSize.Add(int64(len(data))) + return true + default: + return false + } +} + +func (bc *byteChannel) SendSync(ctx context.Context, data []byte) { + sended := false + for { + select { + case <-ctx.Done(): + return + default: + if bc.currentSize.Load()+int64(len(data)) < bc.capacity { + select { + case <-ctx.Done(): + return + case bc.ch <- data: + bc.currentChanSize.Add(1) + bc.currentSize.Add(int64(len(data))) + sended = true + } + } + if !sended { + time.Sleep(time.Millisecond * 10) + } else { + return + } + } + } +} + +func (bc *byteChannel) Receive() []byte { + select { + case data := <-bc.ch: + bc.currentChanSize.Add(-1) + bc.currentSize.Add(-int64(len(data))) + return data + default: + return nil + } +} + +func (bc *byteChannel) CurrentChannelSize() int64 { + return bc.currentChanSize.Load() +} + +func (bc *byteChannel) CurrentByteSize() int64 { + return bc.currentSize.Load() +} diff --git a/internal/logtail/multiline/multiline.go b/internal/logtail/multiline/multiline.go index ced7b5784e..0b0721d0df 100644 --- a/internal/logtail/multiline/multiline.go +++ b/internal/logtail/multiline/multiline.go @@ -14,7 +14,7 @@ import ( ) const ( - defaultMaxLength = 4 * 1024 * 1024 + defaultMaxLength = 1 * 1024 * 1024 // 1MB defaultMaxLifeDuration = time.Second * 5 ) diff --git a/internal/logtail/reader/option.go b/internal/logtail/reader/option.go index 1a5a954a9a..2b4e6cf2b7 100644 --- a/internal/logtail/reader/option.go +++ b/internal/logtail/reader/option.go @@ -19,8 +19,8 @@ func DisablePreviousBlock() Option { return func(opt *option) { opt.disablePre func defaultOption() *option { return &option{ - bufSize: 1024 * 16, // 16 KiB - maxLineLength: 1024 * 128, // 128 KiB + bufSize: 1024 * 128, // 128 KB + maxLineLength: 1024 * 512, // 512 KB disablePreviousBlock: false, } } diff --git a/internal/logtail/reader/readlines.go b/internal/logtail/reader/readlines.go index 558fd01490..c3921ba3b0 100644 --- a/internal/logtail/reader/readlines.go +++ b/internal/logtail/reader/readlines.go @@ -15,7 +15,9 @@ import ( var ErrReadEmpty = errors.New("read 0") type Reader interface { + SetReader(io.Reader) ReadLines() ([][]byte, int, error) + ReadLineBlock() ([]byte, int, error) } type reader struct { @@ -37,6 +39,11 @@ func NewReader(rd io.Reader, opts ...Option) Reader { } } +func (r *reader) SetReader(rd io.Reader) { + // 避免再次 NewReader 导致 previousBlock 失效 + r.rd = rd +} + func (r *reader) ReadLines() ([][]byte, int, error) { n, err := r.rd.Read(r.buf) if err != nil && err != io.EOF { @@ -48,13 +55,23 @@ func (r *reader) ReadLines() ([][]byte, int, error) { dst := make([]byte, n) copy(dst, r.buf[:n]) - return r.split(dst), n, nil + return r.splitLines(dst), n, nil } -var splitCharacter = []byte{'\n'} +func (r *reader) ReadLineBlock() ([]byte, int, error) { + n, err := r.rd.Read(r.buf) + if err != nil && err != io.EOF { + return nil, n, err + } + if n == 0 { + return nil, n, ErrReadEmpty + } -func (r *reader) split(b []byte) [][]byte { - lines := bytes.Split(b, splitCharacter) + return r.splitLineBlock(r.buf[:n]), n, nil +} + +func (r *reader) splitLines(b []byte) [][]byte { + lines := SplitLines(b) if len(lines) == 0 { return nil } @@ -89,3 +106,41 @@ func (r *reader) split(b []byte) [][]byte { res = append(res, lines...) return res } + +var splitCharacter = []byte{'\n'} + +func (r *reader) splitLineBlock(b []byte) []byte { + var block []byte + + index := bytes.LastIndex(b, splitCharacter) + if index == -1 { + r.previousBlock = append(r.previousBlock, b...) + } else { + block = make([]byte, len(r.previousBlock)+index+1) + + copy(block, r.previousBlock) + previousBlockLen := len(r.previousBlock) + r.previousBlock = nil + + copy(block[previousBlockLen:], b[:index+1]) + r.previousBlock = append(r.previousBlock, b[index+1:]...) + } + + if len(r.previousBlock) > r.opt.maxLineLength { + if len(block) == 0 { + block = make([]byte, len(r.previousBlock)) + copy(block, r.previousBlock) + } else { + // FIXME: lint error? + // nolint + block = append(block, r.previousBlock...) + } + r.previousBlock = nil + } + + return block +} + +func SplitLines(b []byte) [][]byte { + return bytes.Split(b, splitCharacter) +} diff --git a/internal/logtail/reader/readlines_test.go b/internal/logtail/reader/readlines_test.go index 122b354177..b90ba793a9 100644 --- a/internal/logtail/reader/readlines_test.go +++ b/internal/logtail/reader/readlines_test.go @@ -19,7 +19,6 @@ func TestReadLines(t *testing.T) { buf.WriteString(mockdata) size := 10 - r := NewReader(&buf, WithBufSize(size)) for i := 0; i <= len(mockdata)/size; i++ { @@ -49,14 +48,43 @@ func TestReadLines(t *testing.T) { assert.Equal(t, ErrReadEmpty, err) } -func TestSplit(t *testing.T) { +func TestSplitLines(t *testing.T) { r := &reader{ opt: defaultOption(), } - res := r.split([]byte(mockdata)) + res := r.splitLines([]byte(mockdata)) assert.Equal(t, 2, len(res)) assert.Equal(t, []byte("0123456789"), res[0]) assert.Equal(t, []byte("abcde"), res[1]) assert.Equal(t, []byte("ABCDE"), r.previousBlock) } + +func TestSplitLineBlock(t *testing.T) { + buf := bytes.Buffer{} + buf.WriteString("0123456789\nabcde\nABCDEFGHIGKL") + + size := 10 + r := &reader{ + opt: &option{maxLineLength: 10}, + rd: &buf, + buf: make([]byte, size), + } + + for i := 0; i <= len(mockdata)/size; i++ { + switch i { + case 0: + block, _, _ := r.ReadLineBlock() + assert.Equal(t, []byte(nil), block) + case 1: + block, _, _ := r.ReadLineBlock() + assert.Equal(t, []byte("0123456789\nabcde\n"), block) + case 2: + block, _, _ := r.ReadLineBlock() + assert.Equal(t, []byte("ABCDEFGHIGKL"), block) + case 3: + block, _, _ := r.ReadLineBlock() + assert.Equal(t, []byte(nil), block) + } + } +} diff --git a/internal/plugins/inputs/apache/input.go b/internal/plugins/inputs/apache/input.go index 1973663871..6a58ba6b90 100644 --- a/internal/plugins/inputs/apache/input.go +++ b/internal/plugins/inputs/apache/input.go @@ -127,6 +127,7 @@ func (ipt *Input) RunPipeline() { tailer.WithPipeline(ipt.Log.Pipeline), tailer.WithIgnoreStatus(ipt.Log.IgnoreStatus), tailer.WithCharacterEncoding(ipt.Log.CharacterEncoding), + tailer.EnableMultiline(true), tailer.WithMultilinePatterns([]string{`^\[\w+ \w+ \d+`}), tailer.WithGlobalTags(inputs.MergeTags(ipt.Tagger.HostTags(), ipt.Tags, "")), tailer.EnableDebugFields(config.Cfg.EnableDebugFields), diff --git a/internal/plugins/inputs/container/const.go b/internal/plugins/inputs/container/const.go index 2a4ae3ecc5..c730cab88d 100644 --- a/internal/plugins/inputs/container/const.go +++ b/internal/plugins/inputs/container/const.go @@ -57,6 +57,7 @@ const sampleCfg = ` ## Set true to enable election for k8s metric collection election = true + logging_enable_multiline = true logging_auto_multiline_detection = true logging_auto_multiline_extra_patterns = [] diff --git a/internal/plugins/inputs/container/container_log.go b/internal/plugins/inputs/container/container_log.go index 42d6feee05..907968ff3b 100644 --- a/internal/plugins/inputs/container/container_log.go +++ b/internal/plugins/inputs/container/container_log.go @@ -19,7 +19,7 @@ import ( "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/tailer" ) -const defaultActiveDuration = time.Hour * 3 +const defaultActiveDuration = time.Hour * 1 func (c *container) cleanMissingContainerLog(newIDs []string) { missingIDs := c.logTable.findDifferences(newIDs) @@ -53,8 +53,10 @@ func (c *container) tailingLogs(ins *logInstance) { tailer.WithPipeline(cfg.Pipeline), tailer.EnableDebugFields(config.Cfg.EnableDebugFields), tailer.WithCharacterEncoding(cfg.CharacterEncoding), + tailer.EnableMultiline(c.ipt.LoggingEnableMultline), tailer.WithMultilinePatterns(cfg.MultilinePatterns), tailer.WithGlobalTags(mergedTags), + tailer.WithMaxMultilineLength(int64(float64(config.Cfg.Dataway.MaxRawBodySize) * 0.8)), tailer.WithMaxMultilineLifeDuration(c.ipt.LoggingMaxMultilineLifeDuration), tailer.WithRemoveAnsiEscapeCodes(cfg.RemoveAnsiEscapeCodes || c.ipt.LoggingRemoveAnsiEscapeCodes), tailer.WithMaxForceFlushLimit(c.ipt.LoggingForceFlushLimit), diff --git a/internal/plugins/inputs/container/env.go b/internal/plugins/inputs/container/env.go index c814b4054c..6e62e4fda6 100644 --- a/internal/plugins/inputs/container/env.go +++ b/internal/plugins/inputs/container/env.go @@ -89,6 +89,7 @@ func (ipt *Input) GetENVDoc() []*inputs.ENVInfo { // ENV_INPUT_CONTAINER_LOGGING_SEARCH_INTERVAL : string ("10s") // ENV_INPUT_CONTAINER_LOGGING_EXTRA_SOURCE_MAP : string // ENV_INPUT_CONTAINER_LOGGING_SOURCE_MULTILINE_MAP_JSON : string (JSON map) +// ENV_INPUT_CONTAINER_LOGGING_ENABLE_MULTILINE: booler // ENV_INPUT_CONTAINER_LOGGING_AUTO_MULTILINE_DETECTION: booler // ENV_INPUT_CONTAINER_LOGGING_AUTO_MULTILINE_EXTRA_PATTERNS_JSON : string (JSON string array) // ENV_INPUT_CONTAINER_LOGGING_MAX_MULTILINE_LIFE_DURATION : string ("5s") @@ -288,6 +289,13 @@ func (ipt *Input) ReadEnv(envs map[string]string) { ipt.LoggingAutoMultilineDetection = b } } + if str, ok := envs["ENV_INPUT_CONTAINER_LOGGING_ENABLE_MULTILINE"]; ok { + if b, err := strconv.ParseBool(str); err != nil { + l.Warnf("parse ENV_INPUT_CONTAINER_LOGGING_ENABLE_MULTILINE to bool: %s, ignore", err) + } else { + ipt.LoggingEnableMultline = b + } + } if str, ok := envs["ENV_INPUT_CONTAINER_LOGGING_AUTO_MULTILINE_EXTRA_PATTERNS_JSON"]; ok { if err := json.Unmarshal([]byte(str), &ipt.LoggingAutoMultilineExtraPatterns); err != nil { l.Warnf("parse ENV_INPUT_CONTAINER_LOGGING_AUTO_MULTILINE_EXTRA_PATTERNS_JSON to map: %s, ignore", err) diff --git a/internal/plugins/inputs/container/input.go b/internal/plugins/inputs/container/input.go index 82d55acb27..a49cacf67e 100644 --- a/internal/plugins/inputs/container/input.go +++ b/internal/plugins/inputs/container/input.go @@ -54,6 +54,7 @@ type Input struct { ContainerMaxConcurrent int `toml:"container_max_concurrent"` ContainerIncludeLog []string `toml:"container_include_log"` ContainerExcludeLog []string `toml:"container_exclude_log"` + LoggingEnableMultline bool `toml:"logging_enable_multiline"` LoggingExtraSourceMap map[string]string `toml:"logging_extra_source_map"` LoggingSourceMultilineMap map[string]string `toml:"logging_source_multiline_map"` LoggingAutoMultilineDetection bool `toml:"logging_auto_multiline_detection"` @@ -135,6 +136,7 @@ func newInput() *Input { EnableK8sEvent: true, EnableK8sNodeLocal: true, Tags: make(map[string]string), + LoggingEnableMultline: true, LoggingExtraSourceMap: make(map[string]string), LoggingSourceMultilineMap: make(map[string]string), Election: true, diff --git a/internal/plugins/inputs/elasticsearch/input.go b/internal/plugins/inputs/elasticsearch/input.go index f5cd5a2dfe..79ab5f23d4 100644 --- a/internal/plugins/inputs/elasticsearch/input.go +++ b/internal/plugins/inputs/elasticsearch/input.go @@ -521,6 +521,7 @@ func (ipt *Input) RunPipeline() { tailer.WithPipeline(ipt.Log.Pipeline), tailer.WithIgnoreStatus(ipt.Log.IgnoreStatus), tailer.WithCharacterEncoding(ipt.Log.CharacterEncoding), + tailer.EnableMultiline(true), tailer.WithMultilinePatterns([]string{ipt.Log.MultilineMatch}), tailer.WithGlobalTags(inputs.MergeTags(ipt.tagger.HostTags(), ipt.Tags, "")), tailer.EnableDebugFields(config.Cfg.EnableDebugFields), diff --git a/internal/plugins/inputs/influxdb/input.go b/internal/plugins/inputs/influxdb/input.go index 469f7f9566..64e51661bd 100644 --- a/internal/plugins/inputs/influxdb/input.go +++ b/internal/plugins/inputs/influxdb/input.go @@ -128,6 +128,7 @@ func (ipt *Input) RunPipeline() { tailer.WithPipeline(ipt.Log.Pipeline), tailer.WithIgnoreStatus(ipt.Log.IgnoreStatus), tailer.WithCharacterEncoding(ipt.Log.CharacterEncoding), + tailer.EnableMultiline(true), tailer.WithMultilinePatterns([]string{ipt.Log.MultilineMatch}), tailer.WithGlobalTags(inputs.MergeTags(ipt.Tagger.HostTags(), ipt.Tags, "")), tailer.EnableDebugFields(config.Cfg.EnableDebugFields), diff --git a/internal/plugins/inputs/jenkins/input.go b/internal/plugins/inputs/jenkins/input.go index 94d8b7b6d9..097588e743 100644 --- a/internal/plugins/inputs/jenkins/input.go +++ b/internal/plugins/inputs/jenkins/input.go @@ -181,6 +181,7 @@ func (ipt *Input) RunPipeline() { tailer.WithPipeline(ipt.Log.Pipeline), tailer.WithIgnoreStatus(ipt.Log.IgnoreStatus), tailer.WithCharacterEncoding(ipt.Log.CharacterEncoding), + tailer.EnableMultiline(true), tailer.WithMultilinePatterns([]string{`^\d{4}-\d{2}-\d{2}`}), tailer.WithGlobalTags(inputs.MergeTags(ipt.Tagger.HostTags(), ipt.Tags, "")), tailer.EnableDebugFields(config.Cfg.EnableDebugFields), diff --git a/internal/plugins/inputs/kafka/input.go b/internal/plugins/inputs/kafka/input.go index 776d23c12c..e29d8f5738 100644 --- a/internal/plugins/inputs/kafka/input.go +++ b/internal/plugins/inputs/kafka/input.go @@ -69,6 +69,7 @@ func (ipt *Input) RunPipeline() { tailer.WithPipeline(ipt.Log.Pipeline), tailer.WithIgnoreStatus(ipt.Log.IgnoreStatus), tailer.WithCharacterEncoding(ipt.Log.CharacterEncoding), + tailer.EnableMultiline(true), tailer.WithMultilinePatterns([]string{ipt.Log.MultilineMatch}), tailer.WithGlobalTags(inputs.MergeTags(ipt.Tagger.HostTags(), ipt.Tags, "")), tailer.EnableDebugFields(config.Cfg.EnableDebugFields), diff --git a/internal/plugins/inputs/logging/input.go b/internal/plugins/inputs/logging/input.go index dc70effed7..2d3674af2b 100644 --- a/internal/plugins/inputs/logging/input.go +++ b/internal/plugins/inputs/logging/input.go @@ -153,6 +153,8 @@ func (ipt *Input) Run() { tailer.WithFromBeginning(ipt.FromBeginning), tailer.WithCharacterEncoding(ipt.CharacterEncoding), tailer.WithIgnoreDeadLog(ignoreDuration), + tailer.EnableMultiline(ipt.AutoMultilineDetection), + tailer.WithMaxMultilineLength(int64(float64(config.Cfg.Dataway.MaxRawBodySize) * 0.8)), tailer.WithGlobalTags(inputs.MergeTags(ipt.Tagger.HostTags(), ipt.Tags, "")), tailer.WithRemoveAnsiEscapeCodes(ipt.RemoveAnsiEscapeCodes), tailer.WithDone(ipt.semStop.Wait()), diff --git a/internal/plugins/inputs/mongodb/input.go b/internal/plugins/inputs/mongodb/input.go index 8480a389c4..fe24ee1963 100644 --- a/internal/plugins/inputs/mongodb/input.go +++ b/internal/plugins/inputs/mongodb/input.go @@ -226,6 +226,7 @@ func (ipt *Input) RunPipeline() { tailer.WithPipeline(ipt.MgoDBLog.Pipeline), tailer.WithIgnoreStatus(ipt.MgoDBLog.IgnoreStatus), tailer.WithCharacterEncoding(ipt.MgoDBLog.CharacterEncoding), + tailer.EnableMultiline(true), tailer.WithMultilinePatterns([]string{ipt.MgoDBLog.MultilineMatch}), tailer.WithGlobalTags(inputs.MergeTags(ipt.Tagger.HostTags(), ipt.Tags, "")), tailer.EnableDebugFields(config.Cfg.EnableDebugFields), diff --git a/internal/plugins/inputs/mysql/input.go b/internal/plugins/inputs/mysql/input.go index a8e9e54556..994466f05f 100644 --- a/internal/plugins/inputs/mysql/input.go +++ b/internal/plugins/inputs/mysql/input.go @@ -671,6 +671,7 @@ func (ipt *Input) RunPipeline() { tailer.WithPipeline(ipt.Log.Pipeline), tailer.WithIgnoreStatus(ipt.Log.IgnoreStatus), tailer.WithCharacterEncoding(ipt.Log.CharacterEncoding), + tailer.EnableMultiline(true), tailer.WithMultilinePatterns([]string{ipt.Log.MultilineMatch}), tailer.WithGlobalTags(inputs.MergeTags(ipt.tagger.HostTags(), ipt.Tags, "")), tailer.EnableDebugFields(config.Cfg.EnableDebugFields), diff --git a/internal/plugins/inputs/postgresql/input.go b/internal/plugins/inputs/postgresql/input.go index 19dc2189dd..b35e40d2a9 100644 --- a/internal/plugins/inputs/postgresql/input.go +++ b/internal/plugins/inputs/postgresql/input.go @@ -856,6 +856,7 @@ func (ipt *Input) RunPipeline() { tailer.WithPipeline(ipt.Log.Pipeline), tailer.WithIgnoreStatus(ipt.Log.IgnoreStatus), tailer.WithCharacterEncoding(ipt.Log.CharacterEncoding), + tailer.EnableMultiline(true), tailer.WithMultilinePatterns([]string{ipt.Log.MultilineMatch}), tailer.WithGlobalTags(inputs.MergeTags(ipt.tagger.HostTags(), ipt.Tags, "")), tailer.EnableDebugFields(config.Cfg.EnableDebugFields), diff --git a/internal/plugins/inputs/rabbitmq/input.go b/internal/plugins/inputs/rabbitmq/input.go index ea20a70175..7cb6de50d6 100644 --- a/internal/plugins/inputs/rabbitmq/input.go +++ b/internal/plugins/inputs/rabbitmq/input.go @@ -71,6 +71,7 @@ func (ipt *Input) RunPipeline() { tailer.WithPipeline(ipt.Log.Pipeline), tailer.WithIgnoreStatus(ipt.Log.IgnoreStatus), tailer.WithCharacterEncoding(ipt.Log.CharacterEncoding), + tailer.EnableMultiline(true), tailer.WithMultilinePatterns([]string{ipt.Log.MultilineMatch}), tailer.WithGlobalTags(inputs.MergeTags(ipt.Tagger.HostTags(), ipt.Tags, "")), tailer.EnableDebugFields(config.Cfg.EnableDebugFields), diff --git a/internal/plugins/inputs/redis/input.go b/internal/plugins/inputs/redis/input.go index 806692406a..84a551aca3 100644 --- a/internal/plugins/inputs/redis/input.go +++ b/internal/plugins/inputs/redis/input.go @@ -399,6 +399,7 @@ func (ipt *Input) RunPipeline() { tailer.WithPipeline(ipt.Log.Pipeline), tailer.WithIgnoreStatus(ipt.Log.IgnoreStatus), tailer.WithCharacterEncoding(ipt.Log.CharacterEncoding), + tailer.EnableMultiline(true), tailer.WithMultilinePatterns([]string{ipt.Log.MultilineMatch}), tailer.WithGlobalTags(inputs.MergeTags(ipt.tagger.HostTags(), ipt.Tags, "")), tailer.EnableDebugFields(config.Cfg.EnableDebugFields), diff --git a/internal/plugins/inputs/solr/input.go b/internal/plugins/inputs/solr/input.go index c2be6440ba..7970f14421 100644 --- a/internal/plugins/inputs/solr/input.go +++ b/internal/plugins/inputs/solr/input.go @@ -148,6 +148,7 @@ func (ipt *Input) RunPipeline() { tailer.WithPipeline(ipt.Log.Pipeline), tailer.WithIgnoreStatus(ipt.Log.IgnoreStatus), tailer.WithCharacterEncoding(ipt.Log.CharacterEncoding), + tailer.EnableMultiline(true), tailer.WithMultilinePatterns([]string{ipt.Log.MultilineMatch}), tailer.WithGlobalTags(inputs.MergeTags(ipt.Tagger.HostTags(), ipt.Tags, "")), tailer.EnableDebugFields(config.Cfg.EnableDebugFields), diff --git a/internal/plugins/inputs/sqlserver/input.go b/internal/plugins/inputs/sqlserver/input.go index 8191f20c7b..f139df7023 100644 --- a/internal/plugins/inputs/sqlserver/input.go +++ b/internal/plugins/inputs/sqlserver/input.go @@ -354,6 +354,7 @@ func (ipt *Input) RunPipeline() { tailer.WithPipeline(ipt.Log.Pipeline), tailer.WithIgnoreStatus(ipt.Log.IgnoreStatus), tailer.WithCharacterEncoding(ipt.Log.CharacterEncoding), + tailer.EnableMultiline(true), tailer.WithMultilinePatterns([]string{`^\d{4}-\d{2}-\d{2}`}), tailer.WithGlobalTags(inputs.MergeTags(ipt.tagger.HostTags(), ipt.Tags, "")), tailer.EnableDebugFields(config.Cfg.EnableDebugFields), diff --git a/internal/tailer/metrics.go b/internal/tailer/metrics.go index 9cc19f8203..8666135b97 100644 --- a/internal/tailer/metrics.go +++ b/internal/tailer/metrics.go @@ -12,12 +12,14 @@ import ( var ( rotateVec *prometheus.CounterVec - forceFlushVec *prometheus.CounterVec parseFailVec *prometheus.CounterVec openfileVec *prometheus.GaugeVec socketLogConnect *prometheus.CounterVec socketLogCount *prometheus.CounterVec socketLogLength *prometheus.SummaryVec + + pendingBlockLength *prometheus.GaugeVec + pendingByteSize *prometheus.GaugeVec ) func setupMetrics() { @@ -34,19 +36,6 @@ func setupMetrics() { }, ) - forceFlushVec = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "datakit", - Subsystem: "tailer", - Name: "buffer_force_flush_total", - Help: "Tailer force flush total", - }, - []string{ - "source", - "filepath", - }, - ) - parseFailVec = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "datakit", @@ -77,7 +66,7 @@ func setupMetrics() { Namespace: "datakit", Subsystem: "input_logging_socket", Name: "connect_status_total", - Help: "connect and close count for net.conn", + Help: "Connect and close count for net.conn", }, []string{"network", "status"}) @@ -86,7 +75,7 @@ func setupMetrics() { Namespace: "datakit", Subsystem: "input_logging_socket", Name: "feed_message_count_total", - Help: "socket feed to IO message count", + Help: "Socket feed to IO message count", }, []string{ "network", @@ -97,7 +86,7 @@ func setupMetrics() { Namespace: "datakit", Subsystem: "input_logging_socket", Name: "log_length", - Help: "record the length of each log line", + Help: "Record the length of each log line", Objectives: map[float64]float64{ 0.5: 0.05, 0.90: 0.01, @@ -106,11 +95,34 @@ func setupMetrics() { }, []string{"network"}) + pendingBlockLength = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "datakit", + Subsystem: "input_logging", + Name: "pending_block_length", + Help: "The length of blocks that are pending processing", + }, + []string{"source", "filepath"}) + + pendingByteSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "datakit", + Subsystem: "input_logging", + Name: "pending_byte_size", + Help: "The size of bytes that are pending processing", + }, + []string{"source", "filepath"}) + metrics.MustRegister( openfileVec, + parseFailVec, + rotateVec, + socketLogLength, + socketLogCount, + socketLogConnect, + pendingBlockLength, + pendingByteSize, ) - - metrics.MustRegister(socketLogLength, socketLogCount, socketLogConnect) } //nolint:gochecknoinits diff --git a/internal/tailer/option.go b/internal/tailer/option.go index 7d490fb790..97819a6114 100644 --- a/internal/tailer/option.go +++ b/internal/tailer/option.go @@ -31,6 +31,7 @@ type option struct { // 添加 debug 字段 enableDebugFields bool + enableMultiline bool // 匹配正则表达式 // 符合此正则匹配的数据,将被认定为有效数据。否则会累积追加到上一条有效数据的末尾 // 例如 ^\d{4}-\d{2}-\d{2} 行首匹配 YYYY-MM-DD 时间格式 @@ -38,6 +39,7 @@ type option struct { multilinePatterns []string // 最大多行存在时间,避免堆积过久 maxMultilineLifeDuration time.Duration + maxMultilineLength int64 // 是否从文件起始处开始读取,如果打开此项,可能会导致大量数据重复 fromBeginning bool @@ -100,6 +102,10 @@ func WithService(s string) Option { } } +func EnableMultiline(b bool) Option { + return func(opt *option) { opt.enableMultiline = b } +} + func WithMultilinePatterns(arr []string) Option { return func(opt *option) { opt.multilinePatterns = arr } } @@ -112,6 +118,10 @@ func WithMaxMultilineLifeDuration(dur time.Duration) Option { } } +func WithMaxMultilineLength(n int64) Option { + return func(opt *option) { opt.maxMultilineLength = n } +} + func WithRemoveAnsiEscapeCodes(b bool) Option { return func(opt *option) { opt.removeAnsiEscapeCodes = b } } @@ -172,7 +182,7 @@ func defaultOption() *option { source: "default", extraTags: map[string]string{"service": "default"}, maxForceFlushLimit: 10, - fileFromBeginningThresholdSize: 1000 * 1000 * 1, // 1 MB + fileFromBeginningThresholdSize: 1000 * 1000 * 20, // 20 MB done: make(<-chan interface{}), feeder: dkio.DefaultFeeder(), } diff --git a/internal/tailer/tailer_single.go b/internal/tailer/tailer_single.go index 552d350d3e..5681091ac7 100644 --- a/internal/tailer/tailer_single.go +++ b/internal/tailer/tailer_single.go @@ -7,6 +7,7 @@ package tailer import ( "bytes" + "context" "errors" "fmt" "io" @@ -21,6 +22,7 @@ import ( dkio "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/logtail" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/logtail/ansi" + "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/logtail/bytechannel" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/logtail/multiline" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/logtail/openfile" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/logtail/reader" @@ -31,7 +33,7 @@ import ( const ( defaultSleepDuration = time.Second - checkInterval = time.Second * 5 + checkInterval = time.Second * 3 ) type Single struct { @@ -46,10 +48,9 @@ type Single struct { mult *multiline.Multiline reader reader.Reader - readTime time.Time - readLines int64 - offset int64 - flushScore int + offset int64 + readLines int64 + bc bytechannel.ByteChannel partialContentBuff bytes.Buffer @@ -68,6 +69,7 @@ func NewTailerSingle(filepath string, opts ...Option) (*Single, error) { t := &Single{ opt: c, filepath: filepath, + bc: bytechannel.NewByteChannel(), } t.buildTags(t.opt.extraTags) t.log = logger.SLogger("logging/" + t.opt.source) @@ -92,6 +94,7 @@ func (t *Single) setup() error { } t.mult, err = multiline.New(t.opt.multilinePatterns, + multiline.WithMaxLength(int(t.opt.maxMultilineLength)), multiline.WithMaxLifeDuration(t.opt.maxMultilineLifeDuration)) if err != nil { return err @@ -114,9 +117,46 @@ func (t *Single) setup() error { return nil } +var processorGo = datakit.G("tailer_processor") + func (t *Single) Run() { - t.forwardMessage() + ctx, cancel := context.WithCancel(context.Background()) + + processorGo.Go(func(_ context.Context) error { + for { + select { + case <-datakit.Exit.Wait(): + cancel() + t.log.Infof("%s processor exit", t.opt.source) + return nil + case <-t.opt.done: + cancel() + t.log.Infof("%s processor exit", t.opt.source) + return nil + case <-ctx.Done(): + t.log.Infof("%s processor exit", t.opt.source) + return nil + default: + // nil + } + + block := t.bc.Receive() + if len(block) == 0 { + time.Sleep(time.Millisecond * 100) + continue + } + + lines := reader.SplitLines(block) + t.process(t.opt.mode, lines) + + pendingBlockLength.WithLabelValues(t.opt.source, t.filepath).Set(float64(t.bc.CurrentChannelSize())) + pendingByteSize.WithLabelValues(t.opt.source, t.filepath).Set(float64(t.bc.CurrentByteSize())) + } + }) + + t.forwardMessage(ctx) t.Close() + cancel() } func (t *Single) Close() { @@ -217,33 +257,25 @@ func (t *Single) reopen() error { return err } + t.reader.SetReader(t.file) t.offset = ret - t.reader = reader.NewReader(t.file) t.inode = openfile.FileInode(t.filepath) t.recordKey = openfile.FileKey(t.filepath) t.log.Infof("reopen file %s, offset %d", t.filepath, t.offset) - rotateVec.WithLabelValues(t.opt.source, t.filepath).Inc() return nil } //nolint:cyclop -func (t *Single) forwardMessage() { +func (t *Single) forwardMessage(ctx context.Context) { checkTicker := time.NewTicker(checkInterval) defer checkTicker.Stop() for { - t.forceFlush() - select { - case <-datakit.Exit.Wait(): - t.log.Infof("exiting: file %s", t.filepath) - return - case <-t.opt.done: + case <-ctx.Done(): t.log.Infof("exiting: file %s", t.filepath) - t.collectToEOF() - t.flushCache() return case <-checkTicker.C: @@ -251,7 +283,7 @@ func (t *Single) forwardMessage() { exist := openfile.FileExists(t.filepath) if did || !exist { - t.collectToEOF() + t.readToEOF(ctx) } if did { @@ -270,62 +302,42 @@ func (t *Single) forwardMessage() { default: // nil } - if err := t.collectOnce(); err != nil { + if err := t.readOnce(ctx); err != nil { if !errors.Is(err, reader.ErrReadEmpty) { t.log.Warnf("failed to read data from file %s, error: %s", t.filepath, err) } t.wait() continue } - - t.resetFlushScore() - } -} - -func (t *Single) forceFlush() { - if t.opt.maxForceFlushLimit == -1 { - return - } - if t.flushScore >= t.opt.maxForceFlushLimit { - t.flushCache() - t.resetFlushScore() } } -func (t *Single) flushCache() { - if t.mult != nil && t.mult.BuffLength() > 0 { - text := t.mult.Flush() - logstr := removeAnsiEscapeCodes(text, t.opt.removeAnsiEscapeCodes) - t.feed([][]byte{logstr}) - - forceFlushVec.WithLabelValues(t.opt.source, t.filepath).Inc() - } -} - -func (t *Single) collectToEOF() { +func (t *Single) readToEOF(ctx context.Context) { t.log.Infof("file %s has been rotated or removed, current offset %d, try to read EOF", t.filepath, t.offset) for { - if err := t.collectOnce(); err != nil { - if !errors.Is(err, reader.ErrReadEmpty) { - t.log.Warnf("read to EOF err: %s", err) + select { + case <-ctx.Done(): + return + default: + if err := t.readOnce(ctx); err != nil { + if !errors.Is(err, reader.ErrReadEmpty) { + t.log.Warnf("read to EOF err: %s", err) + } + return } - break } } } -func (t *Single) collectOnce() error { - lines, readNum, err := t.reader.ReadLines() +func (t *Single) readOnce(ctx context.Context) error { + block, readNum, err := t.reader.ReadLineBlock() if err != nil { return err } - t.readTime = time.Now() - t.process(t.opt.mode, lines) + t.bc.SendSync(ctx, block) t.offset += int64(readNum) - t.log.Debugf("read %d bytes from file %s, offset %d", readNum, t.filepath, t.offset) - return nil } @@ -407,7 +419,6 @@ func (t *Single) feedToRemote(pending [][]byte) { } if t.opt.enableDebugFields { fields["log_read_offset"] = t.offset - fields["log_read_time"] = t.readTime.UnixNano() fields["log_file_inode"] = t.inode } @@ -433,11 +444,13 @@ func (t *Single) feedToIO(pending [][]byte) { } if t.opt.enableDebugFields { fields["log_read_offset"] = t.offset - fields["log_read_time"] = t.readTime.UnixNano() fields["log_file_inode"] = t.inode } - opts := append(point.DefaultLoggingOptions(), point.WithTime(timeNow.Add(time.Duration(i)*time.Microsecond))) + opts := append(point.DefaultLoggingOptions(), + point.WithTime(timeNow.Add(time.Duration(i)*time.Microsecond)), + point.WithPrecheck(false), + ) pt := point.NewPointV2( t.opt.source, @@ -465,11 +478,6 @@ func (t *Single) feedToIO(pending [][]byte) { func (t *Single) wait() { time.Sleep(defaultSleepDuration) - t.flushScore++ -} - -func (t *Single) resetFlushScore() { - t.flushScore = 0 } func (t *Single) buildTags(extraTags map[string]string) { @@ -487,7 +495,7 @@ func (t *Single) decode(text []byte) ([]byte, error) { } func (t *Single) multiline(text []byte) []byte { - if t.mult == nil { + if !t.opt.enableMultiline || t.mult == nil { return text } res, _ := t.mult.ProcessLine(text)