Skip to content

Commit

Permalink
修复容器日志采集存在 inside_filepath 和 host_filepath 标签错误的问题
Browse files Browse the repository at this point in the history
  • Loading branch information
liguozhuang authored and 谭彪 committed Sep 11, 2024
1 parent 6bfce6e commit 4b7811b
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 22 deletions.
23 changes: 15 additions & 8 deletions internal/plugins/inputs/container/container_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ func (c *container) tailingLogs(ins *logInstance) {
}

path := cfg.Path
if cfg.HostFilePath != "" {
path = cfg.HostFilePath
l.Infof("container log %s redirect to host path %s", cfg.Path, cfg.HostFilePath)
if cfg.hostFilePath != "" {
path = cfg.hostFilePath
l.Infof("container log %s redirect to host path %s", cfg.Path, cfg.hostFilePath)
}

mergedTags := inputs.MergeTags(c.extraTags, cfg.Tags, "")
Expand Down Expand Up @@ -65,16 +65,16 @@ func (c *container) tailingLogs(ins *logInstance) {
opts = append(opts, tailer.WithTextParserMode(tailer.CriLogdMode))
}

path = logsJoinRootfs(path)
pathAtRootfs := joinLogsAtRootfs(path)

filelist, err := fileprovider.NewProvider().SearchFiles([]string{path}).Result()
filelist, err := fileprovider.NewProvider().SearchFiles([]string{pathAtRootfs}).Result()
if err != nil {
l.Warnf("failed to scan container-log collection %s(%s) for %s, err: %s", cfg.Path, path, ins.containerName, err)
l.Warnf("failed to scan container-log collection %s(%s) for %s, err: %s", cfg.Path, pathAtRootfs, ins.containerName, err)
continue
}

if len(filelist) == 0 {
l.Infof("container %s not found any log file for path %s, skip", ins.containerName, path)
l.Infof("container %s not found any log file for path %s, skip", ins.containerName, pathAtRootfs)
continue
}

Expand All @@ -85,7 +85,14 @@ func (c *container) tailingLogs(ins *logInstance) {

l.Infof("add container log collection with path %s from source %s", file, cfg.Source)

tail, err := tailer.NewTailerSingle(file, opts...)
newOpts := opts
pathAtInside := trimLogsFromRootfs(file)
if insidePath := joinInsideFilepath(cfg.hostDir, cfg.insideDir, pathAtInside); insidePath != pathAtInside {
newOpts = append(newOpts, tailer.WithTag("inside_filepath", insidePath))
newOpts = append(newOpts, tailer.WithTag("host_filepath", file))
}

tail, err := tailer.NewTailerSingle(file, newOpts...)
if err != nil {
l.Errorf("failed to create container-log collection %s for %s, err: %s", file, ins.containerName, err)
continue
Expand Down
45 changes: 33 additions & 12 deletions internal/plugins/inputs/container/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@ type logConfig struct {
Disable bool `json:"disable"`
Type string `json:"type"`
Path string `json:"path"`
HostFilePath string `json:"-"`
Source string `json:"source"`
Service string `json:"service"`
CharacterEncoding string `json:"character_encoding"`
Pipeline string `json:"pipeline"`
Multiline string `json:"multiline_match"`
RemoveAnsiEscapeCodes bool `json:"remove_ansi_escape_codes"`
MultilinePatterns []string `json:"-"`
Tags map[string]string `json:"tags"`

MultilinePatterns []string `json:"-"`

hostDir string `json:"-"`
insideDir string `json:"-"`
hostFilePath string `json:"-"`
}

type logConfigs []*logConfig
Expand Down Expand Up @@ -86,15 +90,9 @@ func (lc *logInstance) parseLogConfigs() error {

for vol, hostdir := range lc.volMounts {
if strings.HasPrefix(path, vol) {
file := strings.TrimPrefix(path, vol)
cfg.HostFilePath = filepath.Join(hostdir, filepath.Clean(file))

// add target fileapath
if cfg.Tags == nil {
cfg.Tags = make(map[string]string)
}
cfg.Tags["inside_filepath"] = path
cfg.Tags["host_filepath"] = cfg.HostFilePath
cfg.hostDir = hostdir
cfg.insideDir = vol
cfg.hostFilePath = joinHostFilepath(hostdir, vol, path)

foundHostPath = true
}
Expand Down Expand Up @@ -239,7 +237,7 @@ func (lc *logInstance) tags() map[string]string {

const defaultContainerLogMountPoint = "/rootfs"

func logsJoinRootfs(logs string) string {
func joinLogsAtRootfs(logs string) string {
if !datakit.Docker && !config.IsKubernetes() {
return logs
}
Expand All @@ -248,3 +246,26 @@ func logsJoinRootfs(logs string) string {
}
return filepath.Join(defaultContainerLogMountPoint, logs)
}

func trimLogsFromRootfs(logs string) string {
if v := os.Getenv("HOST_ROOT"); v != "" {
return strings.TrimPrefix(logs, v)
}
return strings.TrimPrefix(logs, defaultContainerLogMountPoint)
}

func joinHostFilepath(hostDir, insideDir, insidePath string) string {
if hostDir == "" || insideDir == "" {
return insidePath
}
partialPath := strings.TrimPrefix(insidePath, insideDir)
return filepath.Join(hostDir, filepath.Clean(partialPath))
}

func joinInsideFilepath(hostDir, insideDir, hostPath string) string {
if hostDir == "" || insideDir == "" {
return hostPath
}
partialPath := strings.TrimPrefix(hostPath, hostDir)
return filepath.Join(insideDir, filepath.Clean(partialPath))
}
40 changes: 38 additions & 2 deletions internal/plugins/inputs/container/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ func TestParseLogConfigs(t *testing.T) {
},
},
},
// TODO
// test HostFilePath
}

for idx, tc := range cases {
Expand Down Expand Up @@ -188,3 +186,41 @@ func TestParseLogConfigs(t *testing.T) {
t.Logf("[%d][OK ] %v\n", idx, tc)
}
}

func TestJoinHostFilepath(t *testing.T) {
cases := []struct {
inHostDir, inInsideDir, inPath string
out string
}{
{
inHostDir: "/var/lib/kubelet/pods/ABCDEFG012344567/volumes/kubernetes.io~empty-dir/<volume-name>/",
inInsideDir: "/tmp/log",
inPath: "/tmp/log/nginx-log/a.log",
out: "/var/lib/kubelet/pods/ABCDEFG012344567/volumes/kubernetes.io~empty-dir/<volume-name>/nginx-log/a.log",
},
}

for _, tc := range cases {
res := joinHostFilepath(tc.inHostDir, tc.inInsideDir, tc.inPath)
assert.Equal(t, tc.out, res)
}
}

func TestJoinInsideFilepath(t *testing.T) {
cases := []struct {
inHostDir, inInsideDir, inPath string
out string
}{
{
inHostDir: "/var/lib/kubelet/pods/ABCDEFG012344567/volumes/kubernetes.io~empty-dir/<volume-name>/",
inInsideDir: "/tmp/log",
inPath: "/var/lib/kubelet/pods/ABCDEFG012344567/volumes/kubernetes.io~empty-dir/<volume-name>/nginx-log/a.log",
out: "/tmp/log/nginx-log/a.log",
},
}

for _, tc := range cases {
res := joinInsideFilepath(tc.inHostDir, tc.inInsideDir, tc.inPath)
assert.Equal(t, tc.out, res)
}
}
9 changes: 9 additions & 0 deletions internal/tailer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@ func WithGlobalTags(m map[string]string) Option {
}
}

func WithTag(key, value string) Option {
return func(opt *option) {
if opt.globalTags == nil {
opt.globalTags = make(map[string]string)
}
opt.globalTags[key] = value
}
}

func WithDone(ch <-chan interface{}) Option {
return func(opt *option) { opt.setDone = true; opt.done = ch }
}
Expand Down

0 comments on commit 4b7811b

Please sign in to comment.