From ed302a3c7678143a0ee5f78be850817d7cf6584b Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Mon, 28 Oct 2024 18:34:13 +0100 Subject: [PATCH] [receiver/filelog] fix record counting with header (#35870) #### Description Fixes https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35869 by refactoring of the `Reader::ReadToEnd` method. This refactors the `Reader::ReadToEnd` method by separating reading the file's header from reading the file's contents. This results in very similar code in `readHeader` and `readContents` methods, which was previously deduplicated at the cost of slightly higher complexity. The bug could be fixed without separating header reading from contents reading, but I hope this separation will make it easier to implement content batching in the Reader (https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35455). Content batching was my original motivation for these code changes. I only discovered the problem with record counting when reading the code. #### Link to tracking issue Fixes https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35869 #### Testing In the first commit I have added tests that document the erroneous behavior. In the second commit I have fixed the bug and corrected the tests. #### Documentation Added changelog entry. --- .chloggen/refactor-reader-header-reading.yaml | 27 ++++ pkg/stanza/fileconsumer/config_test.go | 6 - pkg/stanza/fileconsumer/file_test.go | 90 ++++++++++--- .../fileconsumer/internal/reader/reader.go | 119 +++++++++++++----- 4 files changed, 187 insertions(+), 55 deletions(-) create mode 100644 .chloggen/refactor-reader-header-reading.yaml diff --git a/.chloggen/refactor-reader-header-reading.yaml b/.chloggen/refactor-reader-header-reading.yaml new file mode 100644 index 000000000000..845e44be66e7 --- /dev/null +++ b/.chloggen/refactor-reader-header-reading.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/filelog + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: fix record counting with header + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35869] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/stanza/fileconsumer/config_test.go b/pkg/stanza/fileconsumer/config_test.go index 145c43491266..bd3254870d04 100644 --- a/pkg/stanza/fileconsumer/config_test.go +++ b/pkg/stanza/fileconsumer/config_test.go @@ -13,7 +13,6 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/featuregate" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" @@ -724,11 +723,6 @@ func TestBuildWithSplitFunc(t *testing.T) { } func TestBuildWithHeader(t *testing.T) { - require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), true)) - t.Cleanup(func() { - require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), false)) - }) - basicConfig := func() *Config { cfg := NewConfig() cfg.Include = []string{"/var/log/testpath.*"} diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 7739cf71153e..50cad57e821a 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -1151,11 +1151,6 @@ func TestMaxBatching(t *testing.T) { // TestReadExistingLogsWithHeader tests that, when starting from beginning, we // read all the lines that are already there, and parses the headers func TestReadExistingLogsWithHeader(t *testing.T) { - require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), true)) - t.Cleanup(func() { - require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), false)) - }) - tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" @@ -1247,11 +1242,6 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) { } func TestHeaderPersistance(t *testing.T) { - require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), true)) - t.Cleanup(func() { - require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), false)) - }) - tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" @@ -1287,11 +1277,6 @@ func TestHeaderPersistance(t *testing.T) { } func TestHeaderPersistanceInHeader(t *testing.T) { - require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), true)) - t.Cleanup(func() { - require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), false)) - }) - tempDir := t.TempDir() cfg1 := NewConfig().includeDir(tempDir) cfg1.StartAt = "beginning" @@ -1598,3 +1583,78 @@ func TestReadGzipCompressedLogsFromEnd(t *testing.T) { operator.poll(context.TODO()) sink.ExpectToken(t, []byte("testlog4")) } + +func TestIncludeFileRecordNumber(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + cfg := NewConfig().includeDir(tempDir) + cfg.StartAt = "beginning" + cfg.IncludeFileRecordNumber = true + operator, sink := testManager(t, cfg) + + // Create a file, then start + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, "testlog1\n") + + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) + defer func() { + require.NoError(t, operator.Stop()) + }() + + sink.ExpectCall(t, []byte("testlog1"), map[string]any{ + attrs.LogFileName: filepath.Base(temp.Name()), + attrs.LogFileRecordNumber: int64(1), + }) +} + +func TestIncludeFileRecordNumberWithHeaderConfigured(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + cfg := NewConfig().includeDir(tempDir) + cfg.StartAt = "beginning" + cfg.IncludeFileRecordNumber = true + cfg = cfg.withHeader("^#", "(?P[A-z]+)") + operator, sink := testManager(t, cfg) + + // Create a file, then start + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, "#abc\n#xyz: headerValue2\ntestlog1\n") + + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) + defer func() { + require.NoError(t, operator.Stop()) + }() + + sink.ExpectCall(t, []byte("testlog1"), map[string]any{ + attrs.LogFileName: filepath.Base(temp.Name()), + attrs.LogFileRecordNumber: int64(1), + "header_attr": "xyz", + }) +} + +func TestIncludeFileRecordNumberWithHeaderConfiguredButMissing(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + cfg := NewConfig().includeDir(tempDir) + cfg.StartAt = "beginning" + cfg.IncludeFileRecordNumber = true + cfg = cfg.withHeader("^#", "(?P[A-z]+): (?P[A-z]+)") + operator, sink := testManager(t, cfg) + + // Create a file, then start + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, "testlog1\n") + + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) + defer func() { + require.NoError(t, operator.Stop()) + }() + + sink.ExpectCall(t, []byte("testlog1"), map[string]any{ + attrs.LogFileName: filepath.Base(temp.Name()), + attrs.LogFileRecordNumber: int64(1), + }) +} diff --git a/pkg/stanza/fileconsumer/internal/reader/reader.go b/pkg/stanza/fileconsumer/internal/reader/reader.go index e49b5ee4c381..3cb998597069 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader.go @@ -70,7 +70,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) { // SectionReader can only read a fixed window (from previous offset to EOF). info, err := r.file.Stat() if err != nil { - r.set.Logger.Error("Failed to stat", zap.Error(err)) + r.set.Logger.Error("failed to stat", zap.Error(err)) return } currentEOF := info.Size() @@ -80,7 +80,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) { gzipReader, err := gzip.NewReader(io.NewSectionReader(r.file, r.Offset, currentEOF)) if err != nil { if !errors.Is(err, io.EOF) { - r.set.Logger.Error("Failed to create gzip reader", zap.Error(err)) + r.set.Logger.Error("failed to create gzip reader", zap.Error(err)) } return } else { @@ -96,7 +96,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) { } if _, err := r.file.Seek(r.Offset, 0); err != nil { - r.set.Logger.Error("Failed to seek", zap.Error(err)) + r.set.Logger.Error("failed to seek", zap.Error(err)) return } @@ -106,9 +106,85 @@ func (r *Reader) ReadToEnd(ctx context.Context) { } }() + if r.headerReader != nil { + if r.readHeader(ctx) { + return + } + } + + r.readContents(ctx) +} + +func (r *Reader) readHeader(ctx context.Context) (doneReadingFile bool) { + s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.splitFunc) + + // Read the tokens from the file until no more header tokens are found or the end of file is reached. + for { + select { + case <-ctx.Done(): + return true + default: + } + + ok := s.Scan() + if !ok { + if err := s.Error(); err != nil { + r.set.Logger.Error("failed during header scan", zap.Error(err)) + } else { + r.set.Logger.Debug("end of file reached", zap.Bool("delete_at_eof", r.deleteAtEOF)) + if r.deleteAtEOF { + r.delete() + } + } + // Either end of file was reached, or file cannot be scanned. + return true + } + + token, err := r.decoder.Decode(s.Bytes()) + if err != nil { + r.set.Logger.Error("failed to decode header token", zap.Error(err)) + r.Offset = s.Pos() // move past the bad token or we may be stuck + continue + } + + err = r.headerReader.Process(ctx, token, r.FileAttributes) + if err != nil { + if errors.Is(err, header.ErrEndOfHeader) { + // End of header reached. + break + } + r.set.Logger.Error("failed to process header token", zap.Error(err)) + } + + r.Offset = s.Pos() + } + + // Clean up the header machinery + if err := r.headerReader.Stop(); err != nil { + r.set.Logger.Error("failed to stop header pipeline during finalization", zap.Error(err)) + } + r.headerReader = nil + r.HeaderFinalized = true + r.initialBufferSize = scanner.DefaultBufferSize + + // Switch to the normal split and process functions. + r.splitFunc = r.lineSplitFunc + r.processFunc = r.emitFunc + + // Reset position in file to r.Offest after the header scanner might have moved it past a content token. + if _, err := r.file.Seek(r.Offset, 0); err != nil { + r.set.Logger.Error("failed to seek post-header", zap.Error(err)) + return true + } + + return false +} + +func (r *Reader) readContents(ctx context.Context) { + // Create the scanner to read the contents of the file. s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.splitFunc) - // Iterate over the tokenized file, emitting entries as we go + // Iterate over the contents of the file. for { select { case <-ctx.Done(): @@ -119,7 +195,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) { ok := s.Scan() if !ok { if err := s.Error(); err != nil { - r.set.Logger.Error("Failed during scan", zap.Error(err)) + r.set.Logger.Error("failed during scan", zap.Error(err)) } else if r.deleteAtEOF { r.delete() } @@ -128,7 +204,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) { token, err := r.decoder.Decode(s.Bytes()) if err != nil { - r.set.Logger.Error("Failed to decode token", zap.Error(err)) + r.set.Logger.Error("failed to decode token", zap.Error(err)) r.Offset = s.Pos() // move past the bad token or we may be stuck continue } @@ -139,36 +215,11 @@ func (r *Reader) ReadToEnd(ctx context.Context) { } err = r.processFunc(ctx, token, r.FileAttributes) - if err == nil { - r.Offset = s.Pos() // successful emit, update offset - continue - } - - if !errors.Is(err, header.ErrEndOfHeader) { - r.set.Logger.Error("Failed to process token", zap.Error(err)) - r.Offset = s.Pos() // move past the bad token or we may be stuck - continue + if err != nil { + r.set.Logger.Error("failed to process token", zap.Error(err)) } - // Clean up the header machinery - if err = r.headerReader.Stop(); err != nil { - r.set.Logger.Error("Failed to stop header pipeline during finalization", zap.Error(err)) - } - r.headerReader = nil - r.HeaderFinalized = true - - // Switch to the normal split and process functions. - r.splitFunc = r.lineSplitFunc - r.processFunc = r.emitFunc - - // Recreate the scanner with the normal split func. - // Do not use the updated offset from the old scanner, as the most recent token - // could be split differently with the new splitter. - if _, err = r.file.Seek(r.Offset, 0); err != nil { - r.set.Logger.Error("Failed to seek post-header", zap.Error(err)) - return - } - s = scanner.New(r, r.maxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc) + r.Offset = s.Pos() } }