diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index e351452063bd..c5675a90a063 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -230,7 +230,6 @@ func (m *Manager) makeReaders(ctx context.Context, paths []string) { m.tracker.Add(r) } - m.tracker.SyncOffsets() } func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) { diff --git a/pkg/stanza/fileconsumer/internal/reader/factory.go b/pkg/stanza/fileconsumer/internal/reader/factory.go index 97f286db796d..c4bc84ece702 100644 --- a/pkg/stanza/fileconsumer/internal/reader/factory.go +++ b/pkg/stanza/fileconsumer/internal/reader/factory.go @@ -64,7 +64,6 @@ func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader if err != nil { return nil, err } - r.new = true // indicates that a reader is new (no previously known offset) return r, nil } diff --git a/pkg/stanza/fileconsumer/internal/reader/reader.go b/pkg/stanza/fileconsumer/internal/reader/reader.go index 5a3c0414339f..40a74a5485f2 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader.go @@ -53,7 +53,6 @@ type Reader struct { includeFileRecordNum bool compression string acquireFSLock bool - new bool // indicates that a reader is new (no previously known offset) } // ReadToEnd will read until the end of the file @@ -240,9 +239,6 @@ func (r *Reader) GetFileName() string { return r.fileName } -func (r *Reader) IsNew() bool { - return r.new -} func (m Metadata) GetFingerprint() *fingerprint.Fingerprint { return m.Fingerprint } @@ -264,5 +260,4 @@ func (r *Reader) updateFingerprint() { func (r *Reader) SyncMetadata(m *Metadata) { r.Metadata = m - r.new = false } diff --git a/pkg/stanza/fileconsumer/internal/tracker/tracker.go b/pkg/stanza/fileconsumer/internal/tracker/tracker.go index 25041419f495..bd45cfc20b23 100644 --- a/pkg/stanza/fileconsumer/internal/tracker/tracker.go +++ b/pkg/stanza/fileconsumer/internal/tracker/tracker.go @@ -6,6 +6,7 @@ package tracker // import "github.com/open-telemetry/opentelemetry-collector-con import ( "context" "fmt" + "os" "go.opentelemetry.io/collector/component" "go.uber.org/zap" @@ -17,6 +18,12 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" ) +type Record struct { + File *os.File + Fingerprint *fingerprint.Fingerprint + Metadata *reader.Metadata +} + // Interface for tracking files that are being consumed. type Tracker interface { Add(reader *reader.Reader) @@ -31,7 +38,7 @@ type Tracker interface { EndPoll() EndConsume() int TotalReaders() int - SyncOffsets() + FindFiles(records []*Record) } // fileTracker tracks known offsets for files that are being consumed by the manager. @@ -165,14 +172,14 @@ func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) { if t.pollsToArchive <= 0 || t.persister == nil { return } - if err := t.updateArchive(t.archiveIndex, metadata); err != nil { + if err := t.writeArchive(t.archiveIndex, metadata); err != nil { t.set.Logger.Error("error faced while saving to the archive", zap.Error(err)) } t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index } +// readArchive loads data from the archive for a given index and returns a fileset.Filset. func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata], error) { - // readArchive loads data from the archive for a given index and returns a fileset.Filset. key := fmt.Sprintf("knownFiles%d", index) metadata, err := checkpoint.LoadKey(context.Background(), t.persister, key) if err != nil { @@ -183,45 +190,45 @@ func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata] return f, nil } -func (t *fileTracker) updateArchive(index int, rmds *fileset.Fileset[*reader.Metadata]) error { - // updateArchive saves data to the archive for a given index and returns an error, if encountered. +// writeArchive saves data to the archive for a given index and returns an error, if encountered. +func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata]) error { key := fmt.Sprintf("knownFiles%d", index) return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), key) } -func (t *fileTracker) SyncOffsets() { - // SyncOffsets goes through all new (unmatched) readers and updates the metadata, if found on archive. +func (t *fileTracker) FindFiles(records []*Record) { + // FindFiles goes through archive, one fileset at a time and tries to match all fingerprints agains that loaded set. - // To minimize disk access, we first access the index, then review unmatched readers and synchronize their metadata if a match is found. + // To minimize disk access, we first access the index, then review unmatched files and update the metadata, if found. // We exit if no new reader exists. - archiveReadIndex := t.archiveIndex - 1 // try loading most recently written index and iterate backwards - for i := 0; i < t.pollsToArchive; i++ { - newFound := false - data, err := t.readArchive(archiveReadIndex) + mostRecentIndex := t.archiveIndex - 1 + foundRecords := 0 + + // continue executing the loop until either all records are matched or all archive sets have been processed. + for i := 0; i < t.pollsToArchive && foundRecords < len(records); i++ { + modified := false + data, err := t.readArchive(mostRecentIndex) if err != nil { t.set.Logger.Error("error while opening archive", zap.Error(err)) continue } - for _, v := range t.currentPollFiles.Get() { - if v.IsNew() { - newFound = true - if md := data.Match(v.GetFingerprint(), fileset.StartsWith); md != nil { - v.SyncMetadata(md) - } + for _, record := range records { + if md := data.Match(record.Fingerprint, fileset.StartsWith); md != nil && record.Metadata != nil { + // update a record's metadata with the matched metadata. + modified = true + record.Metadata = md + foundRecords++ } } - if !newFound { - // No new reader is available, so there’s no need to go through the rest of the archive. - // Just exit to save time. - break - } - if err := t.updateArchive(archiveReadIndex, data); err != nil { - t.set.Logger.Error("error while opening archive", zap.Error(err)) - continue + if modified { + if err := t.writeArchive(mostRecentIndex, data); err != nil { + t.set.Logger.Error("error while opening archive", zap.Error(err)) + continue + } } - archiveReadIndex = (archiveReadIndex - 1) % t.pollsToArchive + mostRecentIndex = (mostRecentIndex - 1) % t.pollsToArchive } } @@ -281,4 +288,4 @@ func (t *noStateTracker) EndPoll() {} func (t *noStateTracker) TotalReaders() int { return 0 } -func (t *noStateTracker) SyncOffsets() {} +func (t *noStateTracker) FindFiles([]*Record) {}