Skip to content

Commit

Permalink
chore: cleanup, test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
VihasMakwana committed Dec 2, 2024
1 parent 2dac57f commit 9981c78
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 29 deletions.
4 changes: 0 additions & 4 deletions pkg/stanza/fileconsumer/internal/fingerprint/fingerprint.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,6 @@ func (f *Fingerprint) UnmarshalJSON(data []byte) error {
return nil
}

func (f *Fingerprint) GetFingerprint() *Fingerprint {
return f
}

type marshal struct {
FirstBytes []byte `json:"first_bytes"`
}
30 changes: 12 additions & 18 deletions pkg/stanza/fileconsumer/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Tracker interface {
EndPoll()
EndConsume() int
TotalReaders() int
FindFiles([]*fingerprint.Fingerprint) []fileset.Matchable
FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata
}

// fileTracker tracks known offsets for files that are being consumed by the manager.
Expand Down Expand Up @@ -191,31 +191,29 @@ func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Meta
}

// FindFiles goes through archive, one fileset at a time and tries to match all fingerprints against that loaded set.
func (t *fileTracker) FindFiles(fps []*fingerprint.Fingerprint) []fileset.Matchable {
func (t *fileTracker) FindFiles(fps []*fingerprint.Fingerprint) []*reader.Metadata {
// To minimize disk access, we first access the index, then review unmatched files and update the metadata, if found.
// We exit if all fingerprints are matched.

mostRecentIndex := util.Mod(t.archiveIndex-1, t.pollsToArchive)
matchedMetadata := make([]fileset.Matchable, len(fps))
indices := make(map[int]bool) // Track fp indices of original fps slice

for i := 0; i < len(fps); i++ {
indices[i] = true
}
matchedMetadata := make([]*reader.Metadata, len(fps))

// continue executing the loop until either all records are matched or all archive sets have been processed.
for i := 0; i < t.pollsToArchive && len(indices) > 0; i, mostRecentIndex = i+1, util.Mod(mostRecentIndex-1, t.pollsToArchive) {
for i := 0; i < t.pollsToArchive; i, mostRecentIndex = i+1, util.Mod(mostRecentIndex-1, t.pollsToArchive) {
modified := false
data, err := t.readArchive(mostRecentIndex) // we load one fileset atmost once per poll
if err != nil {
t.set.Logger.Error("error while opening archive", zap.Error(err))
continue
}
for index := range indices {
if md := data.Match(fps[index], fileset.StartsWith); md != nil {
// update the matched metadata for this index
for index, fp := range fps {
if matchedMetadata[index] != nil {
// we've already found a match for this index, continue
continue
}
if md := data.Match(fp, fileset.StartsWith); md != nil {
// update the matched metada for the index
matchedMetadata[index] = md
delete(indices, index)
modified = true
}
}
Expand All @@ -226,10 +224,6 @@ func (t *fileTracker) FindFiles(fps []*fingerprint.Fingerprint) []fileset.Matcha
}
}
}
// add remaining fingerprints i.e. unmatched fingerprints
for index := range indices {
matchedMetadata[index] = fps[index]
}
return matchedMetadata
}

Expand Down Expand Up @@ -288,4 +282,4 @@ func (t *noStateTracker) EndPoll() {}

func (t *noStateTracker) TotalReaders() int { return 0 }

func (t *noStateTracker) FindFiles([]*fingerprint.Fingerprint) []fileset.Matchable { return nil }
func (t *noStateTracker) FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata { return nil }
12 changes: 5 additions & 7 deletions pkg/stanza/fileconsumer/internal/tracker/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@ func TestFindFilesOrder(t *testing.T) {
require.Equal(t, len(fps), len(matchables), "return slice should be of same length as input slice")

for i := 0; i < len(matchables); i++ {
require.Truef(t, fps[i].Equal(matchables[i].GetFingerprint()), "fingerprint at index %d is not equal to corresponding return value", i)
if fpInStorage[i] {
// if current fingerprint is present in storage, the corresponding return type should be a "Metadata"
_, ok := matchables[i].(*reader.Metadata)
require.True(t, ok, "resulting index %d should be of reader.Metadata type", i)
// if current fingerprint is present in storage, the corresponding return type should not be nil
require.NotNilf(t, matchables[i], "resulting index %d should be not be nil type", i)
require.Truef(t, fps[i].Equal(matchables[i].GetFingerprint()), "fingerprint at index %d is not equal to corresponding return value", i)
} else {
// if current fingerprint is absent from storage, the corresponding return type should be a "Fingerprint"
_, ok := matchables[i].(*fingerprint.Fingerprint)
require.True(t, ok, "resulting index %d should be of fingerprint.Fingerprint type", i)
// if current fingerprint is absent from storage, the corresponding index should be empty i.e. "nil"
require.Nil(t, matchables[i], "resulting index %d should be of nil type", i)
}
}
}
Expand Down

0 comments on commit 9981c78

Please sign in to comment.