diff --git a/pkg/stanza/fileconsumer/internal/tracker/tracker.go b/pkg/stanza/fileconsumer/internal/tracker/tracker.go index 7c203a3ced39..bc4fb9e7088b 100644 --- a/pkg/stanza/fileconsumer/internal/tracker/tracker.go +++ b/pkg/stanza/fileconsumer/internal/tracker/tracker.go @@ -65,24 +65,8 @@ func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToA knownFiles[i] = fileset.New[*reader.Metadata](maxBatchFiles) } set.Logger = set.Logger.With(zap.String("tracker", "fileTracker")) - archiveIndex := 0 - if persister != nil && pollsToArchive > 0 { - byteIndex, err := persister.Get(context.Background(), archiveIndexKey) - if err != nil { - set.Logger.Error("error while reading the archiveIndexKey. Starting from 0", zap.Error(err)) - } - archiveIndex, err = byteToIndex(byteIndex) - if err != nil { - set.Logger.Error("error getting read index. Starting from 0", zap.Error(err)) - archiveIndex = 0 - } else if archiveIndex < 0 || archiveIndex >= pollsToArchive { - // safety check. It can happen if `polls_to_archive` was changed. - // It's best if we reset the index or else we might end up writing invalid keys - set.Logger.Warn("the read index was found, but it exceeds the bounds. Starting from 0") - archiveIndex = 0 - } - } - return &fileTracker{ + + t := &fileTracker{ set: set, maxBatchFiles: maxBatchFiles, currentPollFiles: fileset.New[*reader.Reader](maxBatchFiles), @@ -90,8 +74,11 @@ func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToA knownFiles: knownFiles, pollsToArchive: pollsToArchive, persister: persister, - archiveIndex: archiveIndex, + archiveIndex: 0, } + t.restoreArchiveIndex() + + return t } func (t *fileTracker) Add(reader *reader.Reader) { @@ -168,6 +155,26 @@ func (t *fileTracker) TotalReaders() int { return total } +func (t *fileTracker) restoreArchiveIndex() { + if !t.archiveEnabled() { + return + } + var archiveIndex = 0 + byteIndex, err := t.persister.Get(context.Background(), archiveIndexKey) + if err != nil { + t.set.Logger.Error("error while reading the archiveIndexKey. Starting from 0", zap.Error(err)) + } + archiveIndex, err = byteToIndex(byteIndex) + if err != nil { + t.set.Logger.Error("error getting read index. Starting from 0", zap.Error(err)) + } else if archiveIndex < 0 || archiveIndex >= t.pollsToArchive { + // safety check. It can happen if `polls_to_archive` was changed. + // It's best if we reset the index or else we might end up writing invalid keys + t.set.Logger.Warn("the read index was found, but it exceeds the bounds. Starting from 0") + } + t.archiveIndex = archiveIndex +} + func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) { // We make use of a ring buffer, where each set of files is stored under a specific index. // Instead of discarding knownFiles[2], write it to the next index and eventually roll over. @@ -186,7 +193,7 @@ func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) { // start // index - if t.pollsToArchive <= 0 || t.persister == nil { + if !t.archiveEnabled() { return } index := t.archiveIndex @@ -215,6 +222,10 @@ func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Meta return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), key, ops...) } +func (t *fileTracker) archiveEnabled() bool { + return t.pollsToArchive > 0 && t.persister != nil +} + // FindFiles goes through archive, one fileset at a time and tries to match all fingerprints against that loaded set. func (t *fileTracker) FindFiles(fps []*fingerprint.Fingerprint) []*reader.Metadata { // To minimize disk access, we first access the index, then review unmatched files and update the metadata, if found.