Skip to content

Commit

Permalink
[receiver/filelog] fix record counting with header (#35870)
Browse files Browse the repository at this point in the history
#### Description

Fixes
#35869
by refactoring of the `Reader::ReadToEnd` method.

This refactors the `Reader::ReadToEnd` method by separating reading the
file's header from reading the file's contents.
This results in very similar code in `readHeader` and `readContents`
methods, which was previously deduplicated at the cost of slightly
higher complexity.
The bug could be fixed without separating header reading from contents
reading, but I hope this separation will make it easier to implement
content batching in the Reader
(#35455).
Content batching was my original motivation for these code changes.
I only discovered the problem with record counting when reading the
code.

#### Link to tracking issue

Fixes
#35869

#### Testing

In the first commit I have added tests that document the erroneous
behavior. In the second commit I have fixed the bug and corrected the
tests.

#### Documentation

Added changelog entry.
  • Loading branch information
andrzej-stencel authored Oct 28, 2024
1 parent 1e41cec commit ed302a3
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 55 deletions.
27 changes: 27 additions & 0 deletions .chloggen/refactor-reader-header-reading.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: receiver/filelog

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: fix record counting with header

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35869]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
6 changes: 0 additions & 6 deletions pkg/stanza/fileconsumer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/featuregate"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
Expand Down Expand Up @@ -724,11 +723,6 @@ func TestBuildWithSplitFunc(t *testing.T) {
}

func TestBuildWithHeader(t *testing.T) {
require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), true))
t.Cleanup(func() {
require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), false))
})

basicConfig := func() *Config {
cfg := NewConfig()
cfg.Include = []string{"/var/log/testpath.*"}
Expand Down
90 changes: 75 additions & 15 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1151,11 +1151,6 @@ func TestMaxBatching(t *testing.T) {
// TestReadExistingLogsWithHeader tests that, when starting from beginning, we
// read all the lines that are already there, and parses the headers
func TestReadExistingLogsWithHeader(t *testing.T) {
require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), true))
t.Cleanup(func() {
require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), false))
})

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
Expand Down Expand Up @@ -1247,11 +1242,6 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) {
}

func TestHeaderPersistance(t *testing.T) {
require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), true))
t.Cleanup(func() {
require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), false))
})

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
Expand Down Expand Up @@ -1287,11 +1277,6 @@ func TestHeaderPersistance(t *testing.T) {
}

func TestHeaderPersistanceInHeader(t *testing.T) {
require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), true))
t.Cleanup(func() {
require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), false))
})

tempDir := t.TempDir()
cfg1 := NewConfig().includeDir(tempDir)
cfg1.StartAt = "beginning"
Expand Down Expand Up @@ -1598,3 +1583,78 @@ func TestReadGzipCompressedLogsFromEnd(t *testing.T) {
operator.poll(context.TODO())
sink.ExpectToken(t, []byte("testlog4"))
}

func TestIncludeFileRecordNumber(t *testing.T) {
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.IncludeFileRecordNumber = true
operator, sink := testManager(t, cfg)

// Create a file, then start
temp := filetest.OpenTemp(t, tempDir)
filetest.WriteString(t, temp, "testlog1\n")

require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
defer func() {
require.NoError(t, operator.Stop())
}()

sink.ExpectCall(t, []byte("testlog1"), map[string]any{
attrs.LogFileName: filepath.Base(temp.Name()),
attrs.LogFileRecordNumber: int64(1),
})
}

func TestIncludeFileRecordNumberWithHeaderConfigured(t *testing.T) {
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.IncludeFileRecordNumber = true
cfg = cfg.withHeader("^#", "(?P<header_attr>[A-z]+)")
operator, sink := testManager(t, cfg)

// Create a file, then start
temp := filetest.OpenTemp(t, tempDir)
filetest.WriteString(t, temp, "#abc\n#xyz: headerValue2\ntestlog1\n")

require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
defer func() {
require.NoError(t, operator.Stop())
}()

sink.ExpectCall(t, []byte("testlog1"), map[string]any{
attrs.LogFileName: filepath.Base(temp.Name()),
attrs.LogFileRecordNumber: int64(1),
"header_attr": "xyz",
})
}

func TestIncludeFileRecordNumberWithHeaderConfiguredButMissing(t *testing.T) {
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.IncludeFileRecordNumber = true
cfg = cfg.withHeader("^#", "(?P<header_key>[A-z]+): (?P<header_value>[A-z]+)")
operator, sink := testManager(t, cfg)

// Create a file, then start
temp := filetest.OpenTemp(t, tempDir)
filetest.WriteString(t, temp, "testlog1\n")

require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
defer func() {
require.NoError(t, operator.Stop())
}()

sink.ExpectCall(t, []byte("testlog1"), map[string]any{
attrs.LogFileName: filepath.Base(temp.Name()),
attrs.LogFileRecordNumber: int64(1),
})
}
119 changes: 85 additions & 34 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
// SectionReader can only read a fixed window (from previous offset to EOF).
info, err := r.file.Stat()
if err != nil {
r.set.Logger.Error("Failed to stat", zap.Error(err))
r.set.Logger.Error("failed to stat", zap.Error(err))
return
}
currentEOF := info.Size()
Expand All @@ -80,7 +80,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
gzipReader, err := gzip.NewReader(io.NewSectionReader(r.file, r.Offset, currentEOF))
if err != nil {
if !errors.Is(err, io.EOF) {
r.set.Logger.Error("Failed to create gzip reader", zap.Error(err))
r.set.Logger.Error("failed to create gzip reader", zap.Error(err))
}
return
} else {
Expand All @@ -96,7 +96,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
}

if _, err := r.file.Seek(r.Offset, 0); err != nil {
r.set.Logger.Error("Failed to seek", zap.Error(err))
r.set.Logger.Error("failed to seek", zap.Error(err))
return
}

Expand All @@ -106,9 +106,85 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
}
}()

if r.headerReader != nil {
if r.readHeader(ctx) {
return
}
}

r.readContents(ctx)
}

func (r *Reader) readHeader(ctx context.Context) (doneReadingFile bool) {
s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.splitFunc)

// Read the tokens from the file until no more header tokens are found or the end of file is reached.
for {
select {
case <-ctx.Done():
return true
default:
}

ok := s.Scan()
if !ok {
if err := s.Error(); err != nil {
r.set.Logger.Error("failed during header scan", zap.Error(err))
} else {
r.set.Logger.Debug("end of file reached", zap.Bool("delete_at_eof", r.deleteAtEOF))
if r.deleteAtEOF {
r.delete()
}
}
// Either end of file was reached, or file cannot be scanned.
return true
}

token, err := r.decoder.Decode(s.Bytes())
if err != nil {
r.set.Logger.Error("failed to decode header token", zap.Error(err))
r.Offset = s.Pos() // move past the bad token or we may be stuck
continue
}

err = r.headerReader.Process(ctx, token, r.FileAttributes)
if err != nil {
if errors.Is(err, header.ErrEndOfHeader) {
// End of header reached.
break
}
r.set.Logger.Error("failed to process header token", zap.Error(err))
}

r.Offset = s.Pos()
}

// Clean up the header machinery
if err := r.headerReader.Stop(); err != nil {
r.set.Logger.Error("failed to stop header pipeline during finalization", zap.Error(err))
}
r.headerReader = nil
r.HeaderFinalized = true
r.initialBufferSize = scanner.DefaultBufferSize

// Switch to the normal split and process functions.
r.splitFunc = r.lineSplitFunc
r.processFunc = r.emitFunc

// Reset position in file to r.Offest after the header scanner might have moved it past a content token.
if _, err := r.file.Seek(r.Offset, 0); err != nil {
r.set.Logger.Error("failed to seek post-header", zap.Error(err))
return true
}

return false
}

func (r *Reader) readContents(ctx context.Context) {
// Create the scanner to read the contents of the file.
s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.splitFunc)

// Iterate over the tokenized file, emitting entries as we go
// Iterate over the contents of the file.
for {
select {
case <-ctx.Done():
Expand All @@ -119,7 +195,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
ok := s.Scan()
if !ok {
if err := s.Error(); err != nil {
r.set.Logger.Error("Failed during scan", zap.Error(err))
r.set.Logger.Error("failed during scan", zap.Error(err))
} else if r.deleteAtEOF {
r.delete()
}
Expand All @@ -128,7 +204,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {

token, err := r.decoder.Decode(s.Bytes())
if err != nil {
r.set.Logger.Error("Failed to decode token", zap.Error(err))
r.set.Logger.Error("failed to decode token", zap.Error(err))
r.Offset = s.Pos() // move past the bad token or we may be stuck
continue
}
Expand All @@ -139,36 +215,11 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
}

err = r.processFunc(ctx, token, r.FileAttributes)
if err == nil {
r.Offset = s.Pos() // successful emit, update offset
continue
}

if !errors.Is(err, header.ErrEndOfHeader) {
r.set.Logger.Error("Failed to process token", zap.Error(err))
r.Offset = s.Pos() // move past the bad token or we may be stuck
continue
if err != nil {
r.set.Logger.Error("failed to process token", zap.Error(err))
}

// Clean up the header machinery
if err = r.headerReader.Stop(); err != nil {
r.set.Logger.Error("Failed to stop header pipeline during finalization", zap.Error(err))
}
r.headerReader = nil
r.HeaderFinalized = true

// Switch to the normal split and process functions.
r.splitFunc = r.lineSplitFunc
r.processFunc = r.emitFunc

// Recreate the scanner with the normal split func.
// Do not use the updated offset from the old scanner, as the most recent token
// could be split differently with the new splitter.
if _, err = r.file.Seek(r.Offset, 0); err != nil {
r.set.Logger.Error("Failed to seek post-header", zap.Error(err))
return
}
s = scanner.New(r, r.maxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc)
r.Offset = s.Pos()
}
}

Expand Down

0 comments on commit ed302a3

Please sign in to comment.