Skip to content

Commit

Permalink
删除日志采集的无关字段,部分 debug 字段通过配置开启
Browse files Browse the repository at this point in the history
  • Loading branch information
liguozhuang authored and 谭彪 committed Oct 18, 2024
1 parent 2e5e5b5 commit 72ae347
Show file tree
Hide file tree
Showing 17 changed files with 90 additions and 93 deletions.
2 changes: 2 additions & 0 deletions internal/config/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,8 @@ func (c *Config) ApplyMainConfig() error {
c.Operator = operator.NewOperatorClientFromEnv()
// 初始化 ENC 密码加密功能。
initCrypto(c)
//
fillPipelineConfig(c)
return nil
}

Expand Down
9 changes: 5 additions & 4 deletions internal/config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ func (c *Config) loadConfdEnvs() {
}

func (c *Config) loadPprofEnvs() {
if v := datakit.GetEnv("ENV_ENABLE_DEBUG_FIELDS"); v != "" {
b, _ := strconv.ParseBool(v)
c.EnableDebugFields = b
}

if v := datakit.GetEnv("ENV_ENABLE_PPROF"); v != "" {
c.EnablePProf = true
}
Expand Down Expand Up @@ -212,10 +217,6 @@ func (c *Config) loadPipelineEnvs() {
c.Pipeline.SQLiteMemMode = true
}

if v := datakit.GetEnv("ENV_PIPELINE_DISABLE_APPEND_RUN_INFO"); v != "" {
c.Pipeline.DisableAppendRunInfo = true
}

if v := datakit.GetEnv("ENV_PIPELINE_OFFLOAD_RECEIVER"); v != "" {
if c.Pipeline.Offload == nil {
c.Pipeline.Offload = &offload.OffloadConfig{
Expand Down
8 changes: 3 additions & 5 deletions internal/config/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,9 @@ func TestLoadEnv(t *testing.T) {
"ENV_HTTP_TLS_CRT": "/path/to/datakit/tls.crt",
"ENV_HTTP_TLS_KEY": "/path/to/datakit/tls.key",

"ENV_ENABLE_ELECTION_NAMESPACE_TAG": "ok",
"ENV_PIPELINE_OFFLOAD_RECEIVER": offload.DKRcv,
"ENV_PIPELINE_DISABLE_APPEND_RUN_INFO": "true",
"ENV_PIPELINE_OFFLOAD_ADDRESSES": "http://aaa:123,http://1.2.3.4:1234",
"ENV_ENABLE_ELECTION_NAMESPACE_TAG": "ok",
"ENV_PIPELINE_OFFLOAD_RECEIVER": offload.DKRcv,
"ENV_PIPELINE_OFFLOAD_ADDRESSES": "http://aaa:123,http://1.2.3.4:1234",
},
expect: func() *Config {
cfg := DefaultConfig()
Expand Down Expand Up @@ -159,7 +158,6 @@ func TestLoadEnv(t *testing.T) {
cfg.Logging.RotateBackups = 10
cfg.Logging.Rotate = 128

cfg.Pipeline.DisableAppendRunInfo = true
cfg.Pipeline.Offload = &offload.OffloadConfig{}
cfg.Pipeline.Offload.Receiver = offload.DKRcv
cfg.Pipeline.Offload.Addresses = []string{"http://aaa:123", "http://1.2.3.4:1234"}
Expand Down
6 changes: 6 additions & 0 deletions internal/config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,9 @@ func GetNamespacePipelineFiles(namespace string) ([]string, error) {
}
return nil, fmt.Errorf("invalid namespace")
}

func fillPipelineConfig(c *Config) {
if c.Pipeline != nil {
c.Pipeline.EnableDebugFields = c.EnableDebugFields
}
}
9 changes: 6 additions & 3 deletions internal/config/mainconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type Config struct {

PointPool *pointPool `toml:"point_pool"`

// debug
EnableDebugFields bool `toml:"enable_debug_fields,omitempty"`
// pprof
EnablePProf bool `toml:"enable_pprof"`
PProfListen string `toml:"pprof_listen"`
Expand Down Expand Up @@ -127,9 +129,10 @@ func DefaultConfig() *Config {
GlobalHostTags: map[string]string{},
GlobalTagsDeprecated: map[string]string{},

EnablePProf: true,
PProfListen: "localhost:6060",
DatakitUser: "root",
EnableDebugFields: false,
EnablePProf: true,
PProfListen: "localhost:6060",
DatakitUser: "root",

Election: &election.ElectionCfg{
Enable: false,
Expand Down
3 changes: 1 addition & 2 deletions internal/pipeline/pl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ const (
plTagService = "_pl_service"
plTagNS = "_pl_ns"
plStatus = "_pl_status"

plFieldCost = "_pl_cost" // data type: float64, unit: second
plFieldCost = "_pl_cost" // data type: float64, unit: second

svcName = "datakit"

Expand Down
4 changes: 3 additions & 1 deletion internal/pipeline/plval/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ type PipelineCfg struct {
UseSQLite bool `toml:"use_sqlite"`
SQLiteMemMode bool `toml:"sqlite_mem_mode"`
Offload *offload.OffloadConfig `toml:"offload"`
DisableAppendRunInfo bool `toml:"disable_append_run_info"`
EnableDebugFields bool `toml:"_"`

DeprecatedDisableAppendRunInfo bool `toml:"disable_append_run_info"`
}

// InitIPdb init ipdb instance.
Expand Down
6 changes: 3 additions & 3 deletions internal/pipeline/plval/plval.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var (
// offload.
_offloadWkr *offload.OffloadWorker

_enableAppendRunInfo bool = true
_enableAppendRunInfo bool = false
)

func EnableAppendRunInfo() bool {
Expand Down Expand Up @@ -120,8 +120,8 @@ func InitPlVal(cfg *PipelineCfg, upFn plmap.UploadFunc, gTags map[string]string,
SetIPDB(ipdb)
}

if cfg != nil && cfg.DisableAppendRunInfo {
_enableAppendRunInfo = false
if cfg != nil && cfg.EnableDebugFields {
_enableAppendRunInfo = true
}

// init refer-table
Expand Down
4 changes: 0 additions & 4 deletions internal/plugins/inputs/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,11 +422,7 @@ func (c *container) transformPoint(info *runtime.Container, setPodLabelAsTags fu
}
}

imageName, shortName, imageTag := runtime.ParseImage(image)
p.SetTag("image", image)
p.SetTag("image_name", imageName)
p.SetTag("image_short_name", shortName)
p.SetTag("image_tag", imageTag)

// only ecs fargate
p.SetTagIfNotEmpty("aws_ecs_cluster_name", getAWSClusterNameForLabels(info.Labels))
Expand Down
3 changes: 2 additions & 1 deletion internal/plugins/inputs/container/container_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"fmt"

"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/config"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/container/runtime"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/goroutine"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/logtail/fileprovider"
Expand Down Expand Up @@ -46,6 +47,7 @@ func (c *container) tailingLogs(ins *logInstance) {
tailer.WithSource(cfg.Source),
tailer.WithService(cfg.Service),
tailer.WithPipeline(cfg.Pipeline),
tailer.WithEnableDebugFields(config.Cfg.EnableDebugFields),
tailer.WithCharacterEncoding(cfg.CharacterEncoding),
tailer.WithMultilinePatterns(cfg.MultilinePatterns),
tailer.WithGlobalTags(mergedTags),
Expand Down Expand Up @@ -89,7 +91,6 @@ func (c *container) tailingLogs(ins *logInstance) {
pathAtInside := trimLogsFromRootfs(file)
if insidePath := joinInsideFilepath(cfg.hostDir, cfg.insideDir, pathAtInside); insidePath != pathAtInside {
newOpts = append(newOpts, tailer.WithTag("inside_filepath", insidePath))
newOpts = append(newOpts, tailer.WithTag("host_filepath", file))
}

tail, err := tailer.NewTailerSingle(file, newOpts...)
Expand Down
9 changes: 3 additions & 6 deletions internal/plugins/inputs/container/container_measurement.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,9 @@ func (*containerLog) Info() *inputs.MeasurementInfo {
"daemonset": inputs.NewTagInfo("The name of the DaemonSet which the object belongs to."),
},
Fields: map[string]interface{}{
"status": &inputs.FieldInfo{DataType: inputs.String, Unit: inputs.UnknownUnit, Desc: "The status of the logging, only supported `info/emerg/alert/critical/error/warning/debug/OK/unknown`."},
"log_read_lines": &inputs.FieldInfo{DataType: inputs.Int, Unit: inputs.NCount, Desc: "The lines of the read file ([:octicons-tag-24: Version-1.4.6](../datakit/changelog.md#cl-1.4.6))."},
"log_read_offset": &inputs.FieldInfo{DataType: inputs.Int, Unit: inputs.UnknownUnit, Desc: "The offset of the read file ([:octicons-tag-24: Version-1.4.8](../datakit/changelog.md#cl-1.4.8) · [:octicons-beaker-24: Experimental](../datakit/index.md#experimental))."},
"log_read_time": &inputs.FieldInfo{DataType: inputs.DurationSecond, Unit: inputs.UnknownUnit, Desc: "The timestamp of the read file."},
"message_length": &inputs.FieldInfo{DataType: inputs.SizeByte, Unit: inputs.NCount, Desc: "The length of the message content."},
"message": &inputs.FieldInfo{DataType: inputs.String, Unit: inputs.UnknownUnit, Desc: "The text of the logging."},
"status": &inputs.FieldInfo{DataType: inputs.String, Unit: inputs.UnknownUnit, Desc: "The status of the logging, only supported `info/emerg/alert/critical/error/warning/debug/OK/unknown`."},
"log_read_lines": &inputs.FieldInfo{DataType: inputs.Int, Unit: inputs.NCount, Desc: "The lines of the read file ([:octicons-tag-24: Version-1.4.6](../datakit/changelog.md#cl-1.4.6))."},
"message": &inputs.FieldInfo{DataType: inputs.String, Unit: inputs.UnknownUnit, Desc: "The text of the logging."},
},
}
}
9 changes: 3 additions & 6 deletions internal/plugins/inputs/container/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,9 @@ func replaceLabelKey(s string) string {

func (lc *logInstance) tags() map[string]string {
m := map[string]string{
"container_id": lc.id,
"container_name": lc.containerName,
"image": lc.image,
"image_name": lc.imageName,
"image_short_name": lc.imageShortName,
"image_tag": lc.imageTag,
"container_id": lc.id,
"container_name": lc.containerName,
"image": lc.image,
}

if lc.podName != "" {
Expand Down
11 changes: 5 additions & 6 deletions internal/plugins/inputs/logging/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/GuanceCloud/cliutils"
"github.com/GuanceCloud/cliutils/logger"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/config"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/datakit"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/goroutine"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/logtail/multiline"
Expand Down Expand Up @@ -146,6 +147,7 @@ func (ipt *Input) Run() {
tailer.WithSource(ipt.Source),
tailer.WithService(ipt.Service),
tailer.WithPipeline(ipt.Pipeline),
tailer.WithEnableDebugFields(config.Cfg.EnableDebugFields),
tailer.WithSockets(ipt.Sockets),
tailer.WithIgnoreStatus(ipt.IgnoreStatus),
tailer.WithFromBeginning(ipt.FromBeginning),
Expand Down Expand Up @@ -276,12 +278,9 @@ func (*loggingMeasurement) Info() *inputs.MeasurementInfo {
"service": inputs.NewTagInfo("Use the `service` of the config."),
},
Fields: map[string]interface{}{
"message": &inputs.FieldInfo{DataType: inputs.String, Unit: inputs.UnknownUnit, Desc: "The text of the logging."},
"status": &inputs.FieldInfo{DataType: inputs.String, Unit: inputs.UnknownUnit, Desc: "The status of the logging, default is `unknown`[^1]."},
"log_read_lines": &inputs.FieldInfo{DataType: inputs.Int, Unit: inputs.NCount, Desc: "The lines of the read file."},
"log_read_offset": &inputs.FieldInfo{DataType: inputs.Int, Unit: inputs.UnknownUnit, Desc: "The offset of the read file."},
"log_read_time": &inputs.FieldInfo{DataType: inputs.DurationSecond, Unit: inputs.UnknownUnit, Desc: "The timestamp of the read file."},
"message_length": &inputs.FieldInfo{DataType: inputs.SizeByte, Unit: inputs.NCount, Desc: "The length of the message content."},
"message": &inputs.FieldInfo{DataType: inputs.String, Unit: inputs.UnknownUnit, Desc: "The text of the logging."},
"status": &inputs.FieldInfo{DataType: inputs.String, Unit: inputs.UnknownUnit, Desc: "The status of the logging, default is `unknown`[^1]."},
"log_read_lines": &inputs.FieldInfo{DataType: inputs.Int, Unit: inputs.NCount, Desc: "The lines of the read file."},
"`__docid`": &inputs.FieldInfo{
DataType: inputs.String,
Unit: inputs.UnknownUnit,
Expand Down
21 changes: 12 additions & 9 deletions internal/tailer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type option struct {
// 解释文件内容时所使用的的字符编码,如果设置为空,将不进行转码处理
// e.g. "utf-8","utf-16le","utf-16be","gbk","gb18030"
characterEncoding string
// 添加 debug 字段
enableDebugFields bool

// 匹配正则表达式
// 符合此正则匹配的数据,将被认定为有效数据。否则会累积追加到上一条有效数据的末尾
Expand All @@ -48,7 +50,7 @@ type option struct {
// 判定不活跃文件
ignoreDeadLog time.Duration
// 添加额外tag
globalTags map[string]string
extraTags map[string]string

// 连续 N 次采集为空,就强制 flush 已有数据
maxForceFlushLimit int
Expand All @@ -70,6 +72,7 @@ func WithIgnoreStatus(arr []string) Option { return func(opt *option) { opt.ig
func WithPipeline(s string) Option { return func(opt *option) { opt.pipeline = s } }
func WithCharacterEncoding(s string) Option { return func(opt *option) { opt.characterEncoding = s } }
func WithFromBeginning(b bool) Option { return func(opt *option) { opt.fromBeginning = b } }
func WithEnableDebugFields(b bool) Option { return func(opt *option) { opt.enableDebugFields = b } }
func WithTextParserMode(mode Mode) Option { return func(opt *option) { opt.mode = mode } }

func WithSource(s string) Option {
Expand All @@ -90,10 +93,10 @@ func WithService(s string) Option {
s = opt.source
}
opt.service = s
if opt.globalTags == nil {
opt.globalTags = make(map[string]string)
if opt.extraTags == nil {
opt.extraTags = make(map[string]string)
}
opt.globalTags["service"] = opt.service
opt.extraTags["service"] = opt.service
}
}

Expand Down Expand Up @@ -144,17 +147,17 @@ func WithMaxForceFlushLimit(n int) Option {
func WithGlobalTags(m map[string]string) Option {
return func(opt *option) {
for k, v := range m {
opt.globalTags[k] = v
opt.extraTags[k] = v
}
}
}

func WithTag(key, value string) Option {
return func(opt *option) {
if opt.globalTags == nil {
opt.globalTags = make(map[string]string)
if opt.extraTags == nil {
opt.extraTags = make(map[string]string)
}
opt.globalTags[key] = value
opt.extraTags[key] = value
}
}

Expand All @@ -167,7 +170,7 @@ func WithFeeder(feeder dkio.Feeder) Option { return func(opt *option) { opt.fee
func defaultOption() *option {
return &option{
source: "default",
globalTags: map[string]string{"service": "default"},
extraTags: map[string]string{"service": "default"},
maxForceFlushLimit: 10,
fileFromBeginningThresholdSize: 1000 * 1000 * 1, // 1 MB
done: make(<-chan interface{}),
Expand Down
8 changes: 4 additions & 4 deletions internal/tailer/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestWithOptions(t *testing.T) {
WithService("testing-service")(opt)

res := map[string]string{"service": "testing-service"}
assert.Equal(t, opt.globalTags, res)
assert.Equal(t, opt.extraTags, res)
})

t.Run("with-default-service", func(t *testing.T) {
Expand All @@ -27,7 +27,7 @@ func TestWithOptions(t *testing.T) {
WithService("")(opt)

res := map[string]string{"service": "testing-source"}
assert.Equal(t, opt.globalTags, res)
assert.Equal(t, opt.extraTags, res)
})

t.Run("with-default-service", func(t *testing.T) {
Expand All @@ -36,14 +36,14 @@ func TestWithOptions(t *testing.T) {
WithSource("testing-source")(opt)

res := map[string]string{"service": "default"}
assert.Equal(t, opt.globalTags, res)
assert.Equal(t, opt.extraTags, res)
})

t.Run("with-non-service", func(t *testing.T) {
opt := defaultOption()
WithSource("testing-source")(opt)

res := map[string]string{"service": "default"}
assert.Equal(t, opt.globalTags, res)
assert.Equal(t, opt.extraTags, res)
})
}
2 changes: 1 addition & 1 deletion internal/tailer/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewSocketLogWithOptions(opts ...Option) (*SocketLogger, error) {
sk := &SocketLogger{
opt: c,
}
sk.tags = buildTags(sk.opt.globalTags)
sk.tags = buildTags(sk.opt.extraTags)
sk.log = logger.SLogger("socketLog/" + sk.opt.source)

if err := sk.setup(); err != nil {
Expand Down
Loading

0 comments on commit 72ae347

Please sign in to comment.