Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][pkg/stanza] Localize reader attribute tests #29678

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 0 additions & 215 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,221 +86,6 @@ See this issue for details: https://github.com/census-instrumentation/opencensus
require.NoError(t, operator.Stop())
}

// AddFields tests that the `log.file.name` and `log.file.path` fields are included
// when IncludeFileName and IncludeFilePath are set to true
func TestAddFileFields(t *testing.T) {
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.IncludeFileName = true
cfg.IncludeFilePath = true
cfg.IncludeFileNameResolved = false
cfg.IncludeFilePathResolved = false
operator, sink := testManager(t, cfg)

// Create a file, then start
temp := filetest.OpenTemp(t, tempDir)
filetest.WriteString(t, temp, "testlog\n")

require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
defer func() {
require.NoError(t, operator.Stop())
}()

_, attributes := sink.NextCall(t)
require.Equal(t, filepath.Base(temp.Name()), attributes[attrs.LogFileName])
require.Equal(t, temp.Name(), attributes[attrs.LogFilePath])
require.Nil(t, attributes[attrs.LogFileNameResolved])
require.Nil(t, attributes[attrs.LogFilePathResolved])
}

// AddFileResolvedFields tests that the `log.file.name_resolved` and `log.file.path_resolved` fields are included
// when IncludeFileNameResolved and IncludeFilePathResolved are set to true
func TestAddFileResolvedFields(t *testing.T) {
if runtime.GOOS == windowsOS {
t.Skip("Windows symlinks usage disabled for now. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21088")
}
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.IncludeFileName = true
cfg.IncludeFilePath = true
cfg.IncludeFileNameResolved = true
cfg.IncludeFilePathResolved = true
operator, sink := testManager(t, cfg)

// Create temp dir with log file
dir := t.TempDir()

file, err := os.CreateTemp(dir, "")
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, file.Close())
})

// Create symbolic link in monitored directory
symLinkPath := filepath.Join(tempDir, "symlink")
err = os.Symlink(file.Name(), symLinkPath)
require.NoError(t, err)

// Populate data
filetest.WriteString(t, file, "testlog\n")

// Resolve path
realPath, err := filepath.EvalSymlinks(file.Name())
require.NoError(t, err)
resolved, err := filepath.Abs(realPath)
require.NoError(t, err)

require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
defer func() {
require.NoError(t, operator.Stop())
}()

_, attributes := sink.NextCall(t)
require.Equal(t, filepath.Base(symLinkPath), attributes[attrs.LogFileName])
require.Equal(t, symLinkPath, attributes[attrs.LogFilePath])
require.Equal(t, filepath.Base(resolved), attributes[attrs.LogFileNameResolved])
require.Equal(t, resolved, attributes[attrs.LogFilePathResolved])
}

// AddFileResolvedFields tests that the `log.file.name_resolved` and `log.file.path_resolved` fields are included
// when IncludeFileNameResolved and IncludeFilePathResolved are set to true and underlaying symlink change
// Scenario:
// monitored file (symlink) -> middleSymlink -> file_1
// monitored file (symlink) -> middleSymlink -> file_2
func TestAddFileResolvedFieldsWithChangeOfSymlinkTarget(t *testing.T) {
if runtime.GOOS == windowsOS {
t.Skip("Windows symlinks usage disabled for now. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21088")
}
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.IncludeFileName = true
cfg.IncludeFilePath = true
cfg.IncludeFileNameResolved = true
cfg.IncludeFilePathResolved = true
operator, sink := testManager(t, cfg)

// Create temp dir with log file
dir := t.TempDir()

file1, err := os.CreateTemp(dir, "")
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, file1.Close())
})

file2, err := os.CreateTemp(dir, "")
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, file2.Close())
})

// Resolve paths
real1, err := filepath.EvalSymlinks(file1.Name())
require.NoError(t, err)
resolved1, err := filepath.Abs(real1)
require.NoError(t, err)

real2, err := filepath.EvalSymlinks(file2.Name())
require.NoError(t, err)
resolved2, err := filepath.Abs(real2)
require.NoError(t, err)

// Create symbolic link in monitored directory
// symLinkPath(target of file input) -> middleSymLinkPath -> file1
middleSymLinkPath := filepath.Join(dir, "symlink")
symLinkPath := filepath.Join(tempDir, "symlink")
err = os.Symlink(file1.Name(), middleSymLinkPath)
require.NoError(t, err)
err = os.Symlink(middleSymLinkPath, symLinkPath)
require.NoError(t, err)

// Populate data
filetest.WriteString(t, file1, "testlog\n")

require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
defer func() {
require.NoError(t, operator.Stop())
}()

_, attributes := sink.NextCall(t)
require.Equal(t, filepath.Base(symLinkPath), attributes[attrs.LogFileName])
require.Equal(t, symLinkPath, attributes[attrs.LogFilePath])
require.Equal(t, filepath.Base(resolved1), attributes[attrs.LogFileNameResolved])
require.Equal(t, resolved1, attributes[attrs.LogFilePathResolved])

// Change middleSymLink to point to file2
err = os.Remove(middleSymLinkPath)
require.NoError(t, err)
err = os.Symlink(file2.Name(), middleSymLinkPath)
require.NoError(t, err)

// Populate data (different content due to fingerprint)
filetest.WriteString(t, file2, "testlog2\n")

_, attributes = sink.NextCall(t)
require.Equal(t, filepath.Base(symLinkPath), attributes[attrs.LogFileName])
require.Equal(t, symLinkPath, attributes[attrs.LogFilePath])
require.Equal(t, filepath.Base(resolved2), attributes[attrs.LogFileNameResolved])
require.Equal(t, resolved2, attributes[attrs.LogFilePathResolved])
}

func TestFileFieldsUpdatedAfterRestart(t *testing.T) {
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.IncludeFileName = true
cfg.IncludeFilePath = true
op1, sink1 := testManager(t, cfg)

// Create a file, then start
temp, err := os.CreateTemp(tempDir, "")
require.NoError(t, err)
filetest.WriteString(t, temp, "testlog1\n")

persister := testutil.NewUnscopedMockPersister()
require.NoError(t, op1.Start(persister))

token, attributes := sink1.NextCall(t)
assert.Equal(t, []byte("testlog1"), token)
assert.Equal(t, filepath.Base(temp.Name()), attributes[attrs.LogFileName])
assert.Equal(t, temp.Name(), attributes[attrs.LogFilePath])
assert.Nil(t, attributes[attrs.LogFileNameResolved])
assert.Nil(t, attributes[attrs.LogFilePathResolved])

require.NoError(t, op1.Stop())
temp.Close() // On windows, we must close the file before renaming it

newPath := temp.Name() + "_renamed"
require.NoError(t, os.Rename(temp.Name(), newPath))

temp = filetest.OpenFile(t, newPath)
filetest.WriteString(t, temp, "testlog2\n")

op2, sink2 := testManager(t, cfg)

require.NoError(t, op2.Start(persister))

token, attributes = sink2.NextCall(t)
assert.Equal(t, []byte("testlog2"), token)
assert.Equal(t, filepath.Base(newPath), attributes[attrs.LogFileName])
assert.Equal(t, newPath, attributes[attrs.LogFilePath])
assert.Nil(t, attributes[attrs.LogFileNameResolved])
assert.Nil(t, attributes[attrs.LogFilePathResolved])

require.NoError(t, op2.Stop())
}

// ReadExistingLogs tests that, when starting from beginning, we
// read all the lines that are already there
func TestReadExistingLogs(t *testing.T) {
Expand Down
133 changes: 133 additions & 0 deletions pkg/stanza/fileconsumer/internal/reader/attributes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package reader

import (
"context"
"os"
"path/filepath"
"runtime"
"testing"

"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/attrs"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest"
)

// AddFields tests that the `log.file.name` and `log.file.path` fields are included
// when IncludeFileName and IncludeFilePath are set to true
func TestAttributes(t *testing.T) {
t.Parallel()

// Create a file, then start
tempDir := t.TempDir()
temp := filetest.OpenTemp(t, tempDir)
filetest.WriteString(t, temp, "testlog\n")

f, sink := testFactory(t, includeFileName(), includeFilePath())
fp, err := f.NewFingerprint(temp)
require.NoError(t, err)

reader, err := f.NewReader(temp, fp)
require.NoError(t, err)
defer reader.Close()

reader.ReadToEnd(context.Background())

token, attributes := sink.NextCall(t)
require.Equal(t, []byte("testlog"), token)
require.Equal(t, filepath.Base(temp.Name()), attributes[attrs.LogFileName])
require.Equal(t, temp.Name(), attributes[attrs.LogFilePath])
require.Nil(t, attributes[attrs.LogFileNameResolved])
require.Nil(t, attributes[attrs.LogFilePathResolved])
}

// TestAttributesResolved tests that the `log.file.name_resolved` and `log.file.path_resolved` fields are included
// when IncludeFileNameResolved and IncludeFilePathResolved are set to true
func TestAttributesResolved(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Windows symlinks usage disabled for now. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21088")
}
t.Parallel()

// Set up actual file
actualDir := t.TempDir()
actualFile, err := os.CreateTemp(actualDir, "")
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, actualFile.Close()) })

// Resolve path
realPath, err := filepath.EvalSymlinks(actualFile.Name())
require.NoError(t, err)
resolved, err := filepath.Abs(realPath)
require.NoError(t, err)

// Create another directory with a symbolic link to the file
symlinkDir := t.TempDir()
symlinkPath := filepath.Join(symlinkDir, "symlink")
require.NoError(t, os.Symlink(actualFile.Name(), symlinkPath))
symlinkFile := filetest.OpenFile(t, symlinkPath)

// Populate data
filetest.WriteString(t, actualFile, "testlog\n")

// Read the data
f, sink := testFactory(t, includeFileName(), includeFilePath(), includeFileNameResolved(), includeFilePathResolved())
fp, err := f.NewFingerprint(symlinkFile)
require.NoError(t, err)
reader, err := f.NewReader(symlinkFile, fp)
require.NoError(t, err)
defer reader.Close()
reader.ReadToEnd(context.Background())

// Validate expectations
token, attributes := sink.NextCall(t)
require.Equal(t, []byte("testlog"), token)
require.Equal(t, filepath.Base(symlinkPath), attributes[attrs.LogFileName])
require.Equal(t, symlinkPath, attributes[attrs.LogFilePath])
require.Equal(t, filepath.Base(resolved), attributes[attrs.LogFileNameResolved])
require.Equal(t, resolved, attributes[attrs.LogFilePathResolved])

// Move symlinked file
newActualPath := actualFile.Name() + "_renamed"
require.NoError(t, os.Remove(symlinkPath))
require.NoError(t, os.Rename(actualFile.Name(), newActualPath))
require.NoError(t, os.Symlink(newActualPath, symlinkPath))

// Append additional data
filetest.WriteString(t, actualFile, "testlog2\n")

// Recreate the reader
symlinkFile = filetest.OpenFile(t, symlinkPath)
reader, err = f.NewReaderFromMetadata(symlinkFile, reader.Close())
require.NoError(t, err)
reader.ReadToEnd(context.Background())

token, attributes = sink.NextCall(t)
require.Equal(t, []byte("testlog2"), token)
require.Equal(t, filepath.Base(symlinkPath), attributes[attrs.LogFileName])
require.Equal(t, symlinkPath, attributes[attrs.LogFilePath])
require.Equal(t, filepath.Base(resolved)+"_renamed", attributes[attrs.LogFileNameResolved])
require.Equal(t, resolved+"_renamed", attributes[attrs.LogFilePathResolved])

// Append additional data
filetest.WriteString(t, actualFile, "testlog3\n")

// Recreate the factory with the attributes disabled
f, sink = testFactory(t)

// Recreate the reader and read new data
symlinkFile = filetest.OpenFile(t, symlinkPath)
reader, err = f.NewReaderFromMetadata(symlinkFile, reader.Close())
require.NoError(t, err)
reader.ReadToEnd(context.Background())

token, attributes = sink.NextCall(t)
require.Equal(t, []byte("testlog3"), token)
require.Nil(t, attributes[attrs.LogFileName])
require.Nil(t, attributes[attrs.LogFilePath])
require.Nil(t, attributes[attrs.LogFileNameResolved])
require.Nil(t, attributes[attrs.LogFilePathResolved])
}
Loading