Skip to content

Commit

Permalink
Merge branch 'iss2432-update-logging-cache' into 'dev'
Browse files Browse the repository at this point in the history
优化日志采集性能

See merge request cloudcare-tools/datakit!3257
  • Loading branch information
谭彪 committed Oct 31, 2024
2 parents cf73feb + 529c66b commit 25cba31
Show file tree
Hide file tree
Showing 25 changed files with 354 additions and 92 deletions.
122 changes: 122 additions & 0 deletions internal/logtail/bytechannel/bytechannel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the MIT License.
// This product includes software developed at Guance Cloud (https://www.guance.com/).
// Copyright 2021-present Guance, Inc.

// Package bytechannel provides a thread-safe channel for sending and receiving byte slices
package bytechannel

import (
"context"
"sync/atomic"
"time"
)

type ByteChannel interface {
Send(context.Context, []byte) bool
SendSync(context.Context, []byte)
Receive() []byte
CurrentChannelSize() int64
CurrentByteSize() int64
}

type option struct {
chanSize int
capacity int64
}

type Option func(*option)

func WithChanSize(n int) Option { return func(opt *option) { opt.chanSize = n } }
func WithCapacity(n int64) Option { return func(opt *option) { opt.capacity = n } }

func defaultOption() *option {
return &option{
chanSize: 512,
capacity: 10 * 1024 * 1024, // 10MB
}
}

type byteChannel struct {
ch chan []byte
capacity int64

currentChanSize *atomic.Int64
currentSize *atomic.Int64
}

func NewByteChannel(opts ...Option) ByteChannel {
c := defaultOption()
for _, opt := range opts {
opt(c)
}

return &byteChannel{
ch: make(chan []byte, c.chanSize),
capacity: c.capacity,
currentChanSize: &atomic.Int64{},
currentSize: &atomic.Int64{},
}
}

func (bc *byteChannel) Send(ctx context.Context, data []byte) bool {
if bc.currentSize.Load()+int64(len(data)) > bc.capacity {
return false
}

select {
case <-ctx.Done():
return false
case bc.ch <- data:
bc.currentChanSize.Add(1)
bc.currentSize.Add(int64(len(data)))
return true
default:
return false
}
}

func (bc *byteChannel) SendSync(ctx context.Context, data []byte) {
sended := false
for {
select {
case <-ctx.Done():
return
default:
if bc.currentSize.Load()+int64(len(data)) < bc.capacity {
select {
case <-ctx.Done():
return
case bc.ch <- data:
bc.currentChanSize.Add(1)
bc.currentSize.Add(int64(len(data)))
sended = true
}
}
if !sended {
time.Sleep(time.Millisecond * 10)
} else {
return
}
}
}
}

func (bc *byteChannel) Receive() []byte {
select {
case data := <-bc.ch:
bc.currentChanSize.Add(-1)
bc.currentSize.Add(-int64(len(data)))
return data
default:
return nil
}
}

func (bc *byteChannel) CurrentChannelSize() int64 {
return bc.currentChanSize.Load()
}

func (bc *byteChannel) CurrentByteSize() int64 {
return bc.currentSize.Load()
}
2 changes: 1 addition & 1 deletion internal/logtail/multiline/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

const (
defaultMaxLength = 4 * 1024 * 1024
defaultMaxLength = 1 * 1024 * 1024 // 1MB
defaultMaxLifeDuration = time.Second * 5
)

Expand Down
4 changes: 2 additions & 2 deletions internal/logtail/reader/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ func DisablePreviousBlock() Option { return func(opt *option) { opt.disablePre

func defaultOption() *option {
return &option{
bufSize: 1024 * 16, // 16 KiB
maxLineLength: 1024 * 128, // 128 KiB
bufSize: 1024 * 128, // 128 KB
maxLineLength: 1024 * 512, // 512 KB
disablePreviousBlock: false,
}
}
63 changes: 59 additions & 4 deletions internal/logtail/reader/readlines.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
var ErrReadEmpty = errors.New("read 0")

type Reader interface {
SetReader(io.Reader)
ReadLines() ([][]byte, int, error)
ReadLineBlock() ([]byte, int, error)
}

type reader struct {
Expand All @@ -37,6 +39,11 @@ func NewReader(rd io.Reader, opts ...Option) Reader {
}
}

func (r *reader) SetReader(rd io.Reader) {
// 避免再次 NewReader 导致 previousBlock 失效
r.rd = rd
}

func (r *reader) ReadLines() ([][]byte, int, error) {
n, err := r.rd.Read(r.buf)
if err != nil && err != io.EOF {
Expand All @@ -48,13 +55,23 @@ func (r *reader) ReadLines() ([][]byte, int, error) {

dst := make([]byte, n)
copy(dst, r.buf[:n])
return r.split(dst), n, nil
return r.splitLines(dst), n, nil
}

var splitCharacter = []byte{'\n'}
func (r *reader) ReadLineBlock() ([]byte, int, error) {
n, err := r.rd.Read(r.buf)
if err != nil && err != io.EOF {
return nil, n, err
}
if n == 0 {
return nil, n, ErrReadEmpty
}

func (r *reader) split(b []byte) [][]byte {
lines := bytes.Split(b, splitCharacter)
return r.splitLineBlock(r.buf[:n]), n, nil
}

func (r *reader) splitLines(b []byte) [][]byte {
lines := SplitLines(b)
if len(lines) == 0 {
return nil
}
Expand Down Expand Up @@ -89,3 +106,41 @@ func (r *reader) split(b []byte) [][]byte {
res = append(res, lines...)
return res
}

var splitCharacter = []byte{'\n'}

func (r *reader) splitLineBlock(b []byte) []byte {
var block []byte

index := bytes.LastIndex(b, splitCharacter)
if index == -1 {
r.previousBlock = append(r.previousBlock, b...)
} else {
block = make([]byte, len(r.previousBlock)+index+1)

copy(block, r.previousBlock)
previousBlockLen := len(r.previousBlock)
r.previousBlock = nil

copy(block[previousBlockLen:], b[:index+1])
r.previousBlock = append(r.previousBlock, b[index+1:]...)
}

if len(r.previousBlock) > r.opt.maxLineLength {
if len(block) == 0 {
block = make([]byte, len(r.previousBlock))
copy(block, r.previousBlock)
} else {
// FIXME: lint error?
// nolint
block = append(block, r.previousBlock...)
}
r.previousBlock = nil
}

return block
}

func SplitLines(b []byte) [][]byte {
return bytes.Split(b, splitCharacter)
}
34 changes: 31 additions & 3 deletions internal/logtail/reader/readlines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ func TestReadLines(t *testing.T) {
buf.WriteString(mockdata)

size := 10

r := NewReader(&buf, WithBufSize(size))

for i := 0; i <= len(mockdata)/size; i++ {
Expand Down Expand Up @@ -49,14 +48,43 @@ func TestReadLines(t *testing.T) {
assert.Equal(t, ErrReadEmpty, err)
}

func TestSplit(t *testing.T) {
func TestSplitLines(t *testing.T) {
r := &reader{
opt: defaultOption(),
}
res := r.split([]byte(mockdata))
res := r.splitLines([]byte(mockdata))
assert.Equal(t, 2, len(res))
assert.Equal(t, []byte("0123456789"), res[0])
assert.Equal(t, []byte("abcde"), res[1])

assert.Equal(t, []byte("ABCDE"), r.previousBlock)
}

func TestSplitLineBlock(t *testing.T) {
buf := bytes.Buffer{}
buf.WriteString("0123456789\nabcde\nABCDEFGHIGKL")

size := 10
r := &reader{
opt: &option{maxLineLength: 10},
rd: &buf,
buf: make([]byte, size),
}

for i := 0; i <= len(mockdata)/size; i++ {
switch i {
case 0:
block, _, _ := r.ReadLineBlock()
assert.Equal(t, []byte(nil), block)
case 1:
block, _, _ := r.ReadLineBlock()
assert.Equal(t, []byte("0123456789\nabcde\n"), block)
case 2:
block, _, _ := r.ReadLineBlock()
assert.Equal(t, []byte("ABCDEFGHIGKL"), block)
case 3:
block, _, _ := r.ReadLineBlock()
assert.Equal(t, []byte(nil), block)
}
}
}
1 change: 1 addition & 0 deletions internal/plugins/inputs/apache/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (ipt *Input) RunPipeline() {
tailer.WithPipeline(ipt.Log.Pipeline),
tailer.WithIgnoreStatus(ipt.Log.IgnoreStatus),
tailer.WithCharacterEncoding(ipt.Log.CharacterEncoding),
tailer.EnableMultiline(true),
tailer.WithMultilinePatterns([]string{`^\[\w+ \w+ \d+`}),
tailer.WithGlobalTags(inputs.MergeTags(ipt.Tagger.HostTags(), ipt.Tags, "")),
tailer.EnableDebugFields(config.Cfg.EnableDebugFields),
Expand Down
1 change: 1 addition & 0 deletions internal/plugins/inputs/container/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const sampleCfg = `
## Set true to enable election for k8s metric collection
election = true
logging_enable_multiline = true
logging_auto_multiline_detection = true
logging_auto_multiline_extra_patterns = []
Expand Down
4 changes: 3 additions & 1 deletion internal/plugins/inputs/container/container_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/tailer"
)

const defaultActiveDuration = time.Hour * 3
const defaultActiveDuration = time.Hour * 1

func (c *container) cleanMissingContainerLog(newIDs []string) {
missingIDs := c.logTable.findDifferences(newIDs)
Expand Down Expand Up @@ -53,8 +53,10 @@ func (c *container) tailingLogs(ins *logInstance) {
tailer.WithPipeline(cfg.Pipeline),
tailer.EnableDebugFields(config.Cfg.EnableDebugFields),
tailer.WithCharacterEncoding(cfg.CharacterEncoding),
tailer.EnableMultiline(c.ipt.LoggingEnableMultline),
tailer.WithMultilinePatterns(cfg.MultilinePatterns),
tailer.WithGlobalTags(mergedTags),
tailer.WithMaxMultilineLength(int64(float64(config.Cfg.Dataway.MaxRawBodySize) * 0.8)),
tailer.WithMaxMultilineLifeDuration(c.ipt.LoggingMaxMultilineLifeDuration),
tailer.WithRemoveAnsiEscapeCodes(cfg.RemoveAnsiEscapeCodes || c.ipt.LoggingRemoveAnsiEscapeCodes),
tailer.WithMaxForceFlushLimit(c.ipt.LoggingForceFlushLimit),
Expand Down
8 changes: 8 additions & 0 deletions internal/plugins/inputs/container/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (ipt *Input) GetENVDoc() []*inputs.ENVInfo {
// ENV_INPUT_CONTAINER_LOGGING_SEARCH_INTERVAL : string ("10s")
// ENV_INPUT_CONTAINER_LOGGING_EXTRA_SOURCE_MAP : string
// ENV_INPUT_CONTAINER_LOGGING_SOURCE_MULTILINE_MAP_JSON : string (JSON map)
// ENV_INPUT_CONTAINER_LOGGING_ENABLE_MULTILINE: booler
// ENV_INPUT_CONTAINER_LOGGING_AUTO_MULTILINE_DETECTION: booler
// ENV_INPUT_CONTAINER_LOGGING_AUTO_MULTILINE_EXTRA_PATTERNS_JSON : string (JSON string array)
// ENV_INPUT_CONTAINER_LOGGING_MAX_MULTILINE_LIFE_DURATION : string ("5s")
Expand Down Expand Up @@ -288,6 +289,13 @@ func (ipt *Input) ReadEnv(envs map[string]string) {
ipt.LoggingAutoMultilineDetection = b
}
}
if str, ok := envs["ENV_INPUT_CONTAINER_LOGGING_ENABLE_MULTILINE"]; ok {
if b, err := strconv.ParseBool(str); err != nil {
l.Warnf("parse ENV_INPUT_CONTAINER_LOGGING_ENABLE_MULTILINE to bool: %s, ignore", err)
} else {
ipt.LoggingEnableMultline = b
}
}
if str, ok := envs["ENV_INPUT_CONTAINER_LOGGING_AUTO_MULTILINE_EXTRA_PATTERNS_JSON"]; ok {
if err := json.Unmarshal([]byte(str), &ipt.LoggingAutoMultilineExtraPatterns); err != nil {
l.Warnf("parse ENV_INPUT_CONTAINER_LOGGING_AUTO_MULTILINE_EXTRA_PATTERNS_JSON to map: %s, ignore", err)
Expand Down
2 changes: 2 additions & 0 deletions internal/plugins/inputs/container/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Input struct {
ContainerMaxConcurrent int `toml:"container_max_concurrent"`
ContainerIncludeLog []string `toml:"container_include_log"`
ContainerExcludeLog []string `toml:"container_exclude_log"`
LoggingEnableMultline bool `toml:"logging_enable_multiline"`
LoggingExtraSourceMap map[string]string `toml:"logging_extra_source_map"`
LoggingSourceMultilineMap map[string]string `toml:"logging_source_multiline_map"`
LoggingAutoMultilineDetection bool `toml:"logging_auto_multiline_detection"`
Expand Down Expand Up @@ -135,6 +136,7 @@ func newInput() *Input {
EnableK8sEvent: true,
EnableK8sNodeLocal: true,
Tags: make(map[string]string),
LoggingEnableMultline: true,
LoggingExtraSourceMap: make(map[string]string),
LoggingSourceMultilineMap: make(map[string]string),
Election: true,
Expand Down
1 change: 1 addition & 0 deletions internal/plugins/inputs/elasticsearch/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ func (ipt *Input) RunPipeline() {
tailer.WithPipeline(ipt.Log.Pipeline),
tailer.WithIgnoreStatus(ipt.Log.IgnoreStatus),
tailer.WithCharacterEncoding(ipt.Log.CharacterEncoding),
tailer.EnableMultiline(true),
tailer.WithMultilinePatterns([]string{ipt.Log.MultilineMatch}),
tailer.WithGlobalTags(inputs.MergeTags(ipt.tagger.HostTags(), ipt.Tags, "")),
tailer.EnableDebugFields(config.Cfg.EnableDebugFields),
Expand Down
1 change: 1 addition & 0 deletions internal/plugins/inputs/influxdb/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (ipt *Input) RunPipeline() {
tailer.WithPipeline(ipt.Log.Pipeline),
tailer.WithIgnoreStatus(ipt.Log.IgnoreStatus),
tailer.WithCharacterEncoding(ipt.Log.CharacterEncoding),
tailer.EnableMultiline(true),
tailer.WithMultilinePatterns([]string{ipt.Log.MultilineMatch}),
tailer.WithGlobalTags(inputs.MergeTags(ipt.Tagger.HostTags(), ipt.Tags, "")),
tailer.EnableDebugFields(config.Cfg.EnableDebugFields),
Expand Down
1 change: 1 addition & 0 deletions internal/plugins/inputs/jenkins/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func (ipt *Input) RunPipeline() {
tailer.WithPipeline(ipt.Log.Pipeline),
tailer.WithIgnoreStatus(ipt.Log.IgnoreStatus),
tailer.WithCharacterEncoding(ipt.Log.CharacterEncoding),
tailer.EnableMultiline(true),
tailer.WithMultilinePatterns([]string{`^\d{4}-\d{2}-\d{2}`}),
tailer.WithGlobalTags(inputs.MergeTags(ipt.Tagger.HostTags(), ipt.Tags, "")),
tailer.EnableDebugFields(config.Cfg.EnableDebugFields),
Expand Down
1 change: 1 addition & 0 deletions internal/plugins/inputs/kafka/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (ipt *Input) RunPipeline() {
tailer.WithPipeline(ipt.Log.Pipeline),
tailer.WithIgnoreStatus(ipt.Log.IgnoreStatus),
tailer.WithCharacterEncoding(ipt.Log.CharacterEncoding),
tailer.EnableMultiline(true),
tailer.WithMultilinePatterns([]string{ipt.Log.MultilineMatch}),
tailer.WithGlobalTags(inputs.MergeTags(ipt.Tagger.HostTags(), ipt.Tags, "")),
tailer.EnableDebugFields(config.Cfg.EnableDebugFields),
Expand Down
Loading

0 comments on commit 25cba31

Please sign in to comment.