diff --git a/internal/tailer/logs.go b/internal/tailer/logs.go index f869219fe7..4563ed431d 100644 --- a/internal/tailer/logs.go +++ b/internal/tailer/logs.go @@ -22,7 +22,7 @@ const ( disableHighFreqIODdata = false ) -type logs struct { +type Logs struct { text string fields map[string]interface{} ts time.Time @@ -30,11 +30,11 @@ type logs struct { err error } -func newLogs(text string) *logs { - return &logs{text: text, fields: make(map[string]interface{})} +func NewLogs(text string) *Logs { + return &Logs{text: text, fields: make(map[string]interface{})} } -func (x *logs) pipeline(p *pipeline.Pipeline) *logs { +func (x *Logs) Pipeline(p *pipeline.Pipeline) *Logs { if x.err != nil || p == nil { x.fields["message"] = x.text return x @@ -51,7 +51,7 @@ func (x *logs) pipeline(p *pipeline.Pipeline) *logs { // checkFieldsLength 检查数据是否过长 // 只有在碰到非 message 字段,且长度超过最大限制时才会返回 error // 防止通过 pipeline 添加巨长字段的恶意行为 -func (x *logs) checkFieldsLength() *logs { +func (x *Logs) CheckFieldsLength() *Logs { if x.err != nil { return x } @@ -101,7 +101,7 @@ var statusMap = map[string]string{ } // addStatus 添加默认status和status映射 -func (x *logs) addStatus(disable bool) *logs { +func (x *Logs) AddStatus(disable bool) *Logs { if x.err != nil || disable { return x } @@ -130,7 +130,7 @@ func (x *logs) addStatus(disable bool) *logs { } // ignoreStatus 过滤指定status -func (x *logs) ignoreStatus(ignoreStatus []string) *logs { +func (x *Logs) IgnoreStatus(ignoreStatus []string) *Logs { if x.err != nil || len(ignoreStatus) == 0 { return x } @@ -147,7 +147,7 @@ func (x *logs) ignoreStatus(ignoreStatus []string) *logs { return x } -func (x *logs) takeTime() *logs { +func (x *Logs) TakeTime() *Logs { if x.err != nil { return x } @@ -170,7 +170,7 @@ func (x *logs) takeTime() *logs { return x } -func (x *logs) point(measurement string, tags map[string]string) *logs { +func (x *Logs) Point(measurement string, tags map[string]string) *Logs { if x.err != nil { return x } @@ -178,7 +178,7 @@ func (x *logs) point(measurement string, tags map[string]string) *logs { return x } -func (x *logs) feed(inputName string) *logs { +func (x *Logs) Feed(inputName string) *Logs { if x.err != nil { return x } @@ -190,14 +190,14 @@ func (x *logs) feed(inputName string) *logs { return x } -func (x *logs) output() string { +func (x *Logs) Output() string { if x.pt == nil { return "" } return x.pt.String() } -func (x *logs) error() error { +func (x *Logs) Error() error { return x.err } diff --git a/internal/tailer/logs_test.go b/internal/tailer/logs_test.go index bb6a9407f0..eb38e6cab2 100644 --- a/internal/tailer/logs_test.go +++ b/internal/tailer/logs_test.go @@ -40,7 +40,7 @@ func TestLogsAll(t *testing.T) { point(source, nil). output() - tu.Assert(t, tc.text != tc.res, + tu.Assert(t, output == tc.res, "\nexpect: %s\n got: %s", tc.res, output) } diff --git a/internal/tailer/tailer_single.go b/internal/tailer/tailer_single.go index 01f4902c80..4fede06562 100644 --- a/internal/tailer/tailer_single.go +++ b/internal/tailer/tailer_single.go @@ -152,15 +152,15 @@ func (t *TailerSingle) processText(text string) error { return nil } - err := newLogs(text). - pipeline(t.pipeline). - checkFieldsLength(). - addStatus(t.opt.DisableAddStatusField). - ignoreStatus(t.opt.IgnoreStatus). - takeTime(). - point(t.opt.Source, t.tags). - feed(t.opt.InputName). - error() + err := NewLogs(text). + Pipeline(t.pipeline). + CheckFieldsLength(). + AddStatus(t.opt.DisableAddStatusField). + IgnoreStatus(t.opt.IgnoreStatus). + TakeTime(). + Point(t.opt.Source, t.tags). + Feed(t.opt.InputName). + Error() return err } diff --git a/man/manuals/container.md b/man/manuals/container.md index fd27504f36..e727e730cb 100644 --- a/man/manuals/container.md +++ b/man/manuals/container.md @@ -60,14 +60,35 @@ 对于容器日志采集,有着更细致的配置,主要解决了区分日志 `source` 和使用 pipeline 的问题。 -日志采集配置项为 `[[inputs.container.logfilter]]`,该项是数组配置,意即可以有多个 logfilter 来处理采集到的容器日志,比如某个容器中既有 MySQL 日志,也有 Redis 日志,那么此时可能需要两个 logfilter 来分别处理它们。 +日志采集配置项为 `[[inputs.container.log]]`,该项是数组配置,意即可以有多个 log 来处理采集到的容器日志,比如某个容器中既有 MySQL 日志,也有 Redis 日志,那么此时可能需要两个 log 来分别处理它们。 -- `filter_message` 为匹配日志文本的正则表达式,该参数类型是字符串数组,只要任意一个正则匹配成功即可。未匹配的日志内容将被丢弃。[正则表达式参见这里](https://golang.org/pkg/regexp/syntax/#hdr-Syntax) +- `match_by` 为匹配类型,只支持填写 `contianer-name` 和 `deployment-name`(注意是中横线)。例如该项为 `container_name`,则会以容器名进行后续的正则匹配,当匹配成功时使用此 `source` 和 pipeline +- `match` 为匹配日志文本的正则表达式,该参数类型是字符串数组,只要任意一个正则匹配成功即可。未匹配的容器,其日志将执行默认处理方式。[正则表达式参见这里](https://golang.org/pkg/regexp/syntax/#hdr-Syntax) >Tips:为保证此处正则表达式的正确书写,请务必将正则表达式用 `'''这里是一个正则表达式'''` 这种形式来配置(即两边用三个单引号来包围正则文本),否则可能导致正则转义问题。 -- `source` 指定数据来源,如果为空值,则默认使用容器名 +- `source` 指定数据来源,其值不可为空 - `service` 指定该条日志的服务名,如果为空值,则使用 `source` 字段值 - `pipeline` 只需写文件名即可,不需要写全路径,使用方式见 [Pipeline 文档](pipeline)。当此值为空值或该文件不存在时,将不使用 pipeline 功能 +如果一个容器的 `container name` 和 `deployment` 分别匹配两个 log,会优先使用 `deployment` 所匹配的 log。例如容器的 `container name` 为 `containerAAA`,`deployment` 为 `deploymentAAA`,且配置如下: + +``` +[[inputs.container.log]] + match_by = "container-name" + match = ['''container*'''] + source = "dummy1" + service = "dummy1" + pipeline = "dummy1.p" + +[[inputs.container.log]] + match_by = "deployment-name" + match = ['''deployment*'''] + source = "dummy2" + service = "dummy2" + pipeline = "dummy2.p" +``` + +此时该容器能够匹配两个 log,优先使用第二个 `deployment`。 + #### 日志切割注意事项 使用 pipeline 功能时,如果切割成功,则: diff --git a/plugins/inputs/container/const.go b/plugins/inputs/container/const.go index de93d0c520..1b8932ef54 100644 --- a/plugins/inputs/container/const.go +++ b/plugins/inputs/container/const.go @@ -77,10 +77,10 @@ const sampleCfg = ` ## Use TLS but skip chain & host verification # insecure_skip_verify = false - #[[inputs.container.logfilter]] - # filter_message = [ - # ''' 2 { + return strings.Join(s[:len(s)-2], "-") + } + return name +} diff --git a/plugins/inputs/container/k8s_test.go b/plugins/inputs/container/k8s_test.go index 5b0338f9f5..c4b597cf68 100644 --- a/plugins/inputs/container/k8s_test.go +++ b/plugins/inputs/container/k8s_test.go @@ -2,6 +2,8 @@ package container import ( "testing" + + tu "gitlab.jiagouyun.com/cloudcare-tools/cliutils/testutil" ) func TestGetContainerPodName(t *testing.T) { @@ -29,3 +31,38 @@ func TestGetContainerPodName(t *testing.T) { t.Logf("[%d] container_id:%s pod_name:%s\n", idx, tc.id, name) } } + +func TestGetDeploymentFromPodName(t *testing.T) { + var cases = []struct { + podName, deploymentName string + }{ + { + "corestone-76b5fb8bd-lbxc6", + "corestone", + }, + { + "nsqd-7c49ff9c77-w85mb", + "nsqd", + }, + { + "kodo-inner-5df4fb4897-csqdz", + "kodo-inner", + }, + { + "invalid-12345678", + "invalid-12345678", + }, + { + "invalid", + "invalid", + }, + } + + for _, tc := range cases { + output := getDeploymentFromPodName(tc.podName) + + tu.Assert(t, output == tc.deploymentName, + "\nexpect: %s\n got: %s", + tc.deploymentName, output) + } +} diff --git a/plugins/inputs/container/log.go b/plugins/inputs/container/log.go new file mode 100644 index 0000000000..db6ded28bb --- /dev/null +++ b/plugins/inputs/container/log.go @@ -0,0 +1,86 @@ +package container + +import ( + "fmt" + "regexp" +) + +type DepercatedLog struct { + FilterMessage []string `toml:"filter_message"` + Source string `toml:"source"` + Service string `toml:"service"` + Pipeline string `toml:"pipeline"` +} + +const ( + logMatchByContainerName = "container-name" + logMatchByDeploymentName = "deployment-name" +) + +type Logs []Log + +type Log struct { + MatchBy string `toml:"match_by"` + Match []string `toml:"match"` + Source string `toml:"source"` + Service string `toml:"service"` + Pipeline string `toml:"pipeline"` + + pattern []*regexp.Regexp +} + +func (gs Logs) Init() error { + for idx, g := range gs { + if g.Source == "" { + return fmt.Errorf("log[%d] source cannot be empty", idx) + } + switch g.MatchBy { + case logMatchByContainerName, logMatchByDeploymentName: + //nil + default: + return fmt.Errorf("invalind by %s, only accept %s and %s", + g.MatchBy, logMatchByContainerName, logMatchByDeploymentName) + } + + // regexp + for _, match := range g.Match { + pattern, err := regexp.Compile(match) + if err != nil { + return fmt.Errorf("config match index[%d], error: %s", idx, err) + } + gs[idx].pattern = append(gs[idx].pattern, pattern) + } + + if g.Service == "" { + g.Service = g.Source + } + } + + return nil +} + +// Match 如果匹配成功则返回该项下标,否则返回 -1 +func (gs Logs) Match(by, str string) (index int) { + if str == "" { + return -1 + } + for idx, g := range gs { + if by != g.MatchBy { + continue + } + for _, pattern := range g.pattern { + if pattern.MatchString(str) { + return idx + } + } + } + return -1 +} + +func (gs Logs) MatchName(deploymentName, containerName string) (index int) { + n := gs.Match(logMatchByDeploymentName, deploymentName) + if n != -1 { + return n + } + return gs.Match(logMatchByContainerName, containerName) +} diff --git a/plugins/inputs/container/logfilter.go b/plugins/inputs/container/logfilter.go deleted file mode 100644 index caf59a7d08..0000000000 --- a/plugins/inputs/container/logfilter.go +++ /dev/null @@ -1,95 +0,0 @@ -package container - -import ( - "fmt" - "path/filepath" - "regexp" - "sync" - - "gitlab.jiagouyun.com/cloudcare-tools/datakit" - "gitlab.jiagouyun.com/cloudcare-tools/datakit/pipeline" -) - -type LogFilters []*LogFilter - -type LogFilter struct { - FilterMessage []string `toml:"filter_message"` - FilterMultiline string `toml:"-"` - Source string `toml:"source"` - Service string `toml:"service"` - Pipeline string `toml:"pipeline"` - - pipelinePool sync.Pool - - multilinePattern *regexp.Regexp - messagePattern []*regexp.Regexp -} - -func (lf *LogFilter) Init() error { - if lf.Service == "" { - lf.Service = lf.Source - } - - if lf.FilterMultiline != "" { - pattern, err := regexp.Compile(lf.FilterMultiline) - if err != nil { - return fmt.Errorf("config FilterMultiline, error: %s", err) - } - lf.multilinePattern = pattern - } - - // regexp - for idx, m := range lf.FilterMessage { - pattern, err := regexp.Compile(m) - if err != nil { - return fmt.Errorf("config FilterMessage index[%d], error: %s", idx, err) - } - lf.messagePattern = append(lf.messagePattern, pattern) - } - - // pipeline 不是并发安全,无法支持多个 goroutine 使用同一个 pipeline 对象 - // 所以在此处使用 pool - // 另,regexp 是并发安全的 - lf.pipelinePool = sync.Pool{ - New: func() interface{} { - if lf.Pipeline == "" { - return nil - } - - // 即使 pipeline 配置错误,也不会影响全局 - p, err := pipeline.NewPipelineFromFile(filepath.Join(datakit.PipelineDir, lf.Pipeline)) - if err != nil { - l.Debugf("new pipeline error: %s", err) - return nil - } - return p - }, - } - - return nil -} -func (lf *LogFilter) RunPipeline(message string) (map[string]interface{}, error) { - pipe := lf.pipelinePool.Get() - // pipe 为空指针(即没有配置 pipeline),将返回默认值 - if pipe == nil { - return map[string]interface{}{"message": message}, nil - } - - return pipe.(*pipeline.Pipeline).Run(message).Result() -} - -func (lf *LogFilter) MatchMessage(message string) bool { - for _, pattern := range lf.messagePattern { - if pattern.MatchString(message) { - return true - } - } - return false -} - -func (lf *LogFilter) MatchMultiline(message string) bool { - if lf.multilinePattern == nil { - return false - } - return lf.multilinePattern.MatchString(message) -} diff --git a/plugins/inputs/container/manual.go b/plugins/inputs/container/manual.go index 59ebbd3db2..76bb6ccfe8 100644 --- a/plugins/inputs/container/manual.go +++ b/plugins/inputs/container/manual.go @@ -29,8 +29,9 @@ func (c *containerMetricMeasurement) Info() *inputs.MeasurementInfo { "images_tag": inputs.NewTagInfo("镜像 tag,例如 `1.21.0`"), "container_type": inputs.NewTagInfo(`容器类型,表明该容器由谁创建,kubernetes/docker`), "state": inputs.NewTagInfo(`运行状态,running`), - "pod_name": inputs.NewTagInfo(`pod 名称`), - "pod_namesapce": inputs.NewTagInfo(`pod 命名空间`), + "pod_name": inputs.NewTagInfo(`pod 名称(容器由 k8s 创建时存在)`), + "pod_namesapce": inputs.NewTagInfo(`pod 命名空间(容器由 k8s 创建时存在)`), + "deployment": inputs.NewTagInfo(`deployment 名称(容器由 k8s 创建时存在)`), }, Fields: map[string]interface{}{ "cpu_usage": &inputs.FieldInfo{DataType: inputs.Float, Unit: inputs.Percent, Desc: "CPU 占主机总量的使用率"}, @@ -58,7 +59,7 @@ func (c *containerObjectMeasurement) LineProto() (*io.Point, error) { func (c *containerObjectMeasurement) Info() *inputs.MeasurementInfo { return &inputs.MeasurementInfo{ Name: containerName, - Desc: "容器对象数据(忽略 k8s pause 容器),数值型字段仅在容器处于 running 时存在,例如 cpu_usage 等", + Desc: "容器对象数据(忽略 k8s pause 容器),如果容器处于非 running 状态,则`cpu_usage`等指标将不存在", Tags: map[string]interface{}{ "container_id": inputs.NewTagInfo(`容器 ID(该字段默认被删除)`), "name": inputs.NewTagInfo(`对象数据的指定 ID`), @@ -71,8 +72,9 @@ func (c *containerObjectMeasurement) Info() *inputs.MeasurementInfo { "container_host": inputs.NewTagInfo(`容器内部的主机名`), "container_type": inputs.NewTagInfo(`容器类型,表明该容器由谁创建,kubernetes/docker`), "state": inputs.NewTagInfo(`运行状态,running/exited/removed`), - "pod_name": inputs.NewTagInfo(`pod 名称`), - "pod_namesapce": inputs.NewTagInfo(`pod 命名空间`), + "pod_name": inputs.NewTagInfo(`pod 名称(容器由 k8s 创建时存在)`), + "pod_namesapce": inputs.NewTagInfo(`pod 命名空间(容器由 k8s 创建时存在)`), + "deployment": inputs.NewTagInfo(`deployment 名称(容器由 k8s 创建时存在)`), }, Fields: map[string]interface{}{ "process": &inputs.FieldInfo{DataType: inputs.String, Unit: inputs.UnknownUnit, Desc: "容器进程列表,即运行命令`ps -ef`所得,内容为 JSON 字符串,格式是 map 数组"}, @@ -110,9 +112,12 @@ func (c *containerLogMeasurement) Info() *inputs.MeasurementInfo { "container_id": inputs.NewTagInfo(`容器ID`), "container_type": inputs.NewTagInfo(`容器类型,表明该容器由谁创建,kubernetes/docker`), "stream": inputs.NewTagInfo(`数据流方式,stdout/stderr/tty`), + "pod_name": inputs.NewTagInfo(`pod 名称(容器由 k8s 创建时存在)`), + "pod_namesapce": inputs.NewTagInfo(`pod 命名空间(容器由 k8s 创建时存在)`), + "deployment": inputs.NewTagInfo(`deployment 名称(容器由 k8s 创建时存在)`), + "service": inputs.NewTagInfo(`服务名称`), }, Fields: map[string]interface{}{ - "service": &inputs.FieldInfo{DataType: inputs.String, Unit: inputs.UnknownUnit, Desc: "服务名称"}, "status": &inputs.FieldInfo{DataType: inputs.String, Unit: inputs.UnknownUnit, Desc: "日志状态,info/emerg/alert/critical/error/warning/debug/OK"}, "message": &inputs.FieldInfo{DataType: inputs.String, Unit: inputs.UnknownUnit, Desc: "日志源数据"}, }, @@ -162,7 +167,7 @@ func (k *kubeletPodObjectMeasurement) LineProto() (*io.Point, error) { func (k *kubeletPodObjectMeasurement) Info() *inputs.MeasurementInfo { return &inputs.MeasurementInfo{ Name: kubeletPodName, - Desc: "kubelet pod 对象数据(数值型字段仅在 pod 处于 Running 时存在,例如 cpu_usage 等)", + Desc: "kubelet pod 对象数据,如果 pod 处于非 Running 状态,则`cpu_usage`等指标将不存在", Tags: map[string]interface{}{ "node_name": inputs.NewTagInfo(`所在 kubelet node 名字`), "name": inputs.NewTagInfo(`pod UID`),