diff --git a/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go b/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go index b9476fb3d5e9..8a5a60b7d734 100644 --- a/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go +++ b/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go @@ -47,7 +47,11 @@ func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.M // Load loads the most recent set of files to the database func Load(ctx context.Context, persister operator.Persister) ([]*reader.Metadata, error) { - encoded, err := persister.Get(ctx, knownFilesKey) + return LoadKey(ctx, persister, knownFilesKey) +} + +func LoadKey(ctx context.Context, persister operator.Persister, key string) ([]*reader.Metadata, error) { + encoded, err := persister.Get(ctx, key) if err != nil { return nil, err } diff --git a/pkg/stanza/fileconsumer/internal/tracker/tracker.go b/pkg/stanza/fileconsumer/internal/tracker/tracker.go index 54bf5e9e12c1..c784d1c3485a 100644 --- a/pkg/stanza/fileconsumer/internal/tracker/tracker.go +++ b/pkg/stanza/fileconsumer/internal/tracker/tracker.go @@ -31,6 +31,7 @@ type Tracker interface { EndPoll() EndConsume() int TotalReaders() int + FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata } // fileTracker tracks known offsets for files that are being consumed by the manager. @@ -164,13 +165,80 @@ func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) { if t.pollsToArchive <= 0 || t.persister == nil { return } - key := fmt.Sprintf("knownFiles%d", t.archiveIndex) - if err := checkpoint.SaveKey(context.Background(), t.persister, metadata.Get(), key); err != nil { + if err := t.writeArchive(t.archiveIndex, metadata); err != nil { t.set.Logger.Error("error faced while saving to the archive", zap.Error(err)) } t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index } +// readArchive loads data from the archive for a given index and returns a fileset.Filset. +func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata], error) { + key := fmt.Sprintf("knownFiles%d", index) + metadata, err := checkpoint.LoadKey(context.Background(), t.persister, key) + if err != nil { + return nil, err + } + f := fileset.New[*reader.Metadata](len(metadata)) + f.Add(metadata...) + return f, nil +} + +// writeArchive saves data to the archive for a given index and returns an error, if encountered. +func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata]) error { + key := fmt.Sprintf("knownFiles%d", index) + return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), key) +} + +// FindFiles goes through archive, one fileset at a time and tries to match all fingerprints against that loaded set. +func (t *fileTracker) FindFiles(fps []*fingerprint.Fingerprint) []*reader.Metadata { + // To minimize disk access, we first access the index, then review unmatched files and update the metadata, if found. + // We exit if all fingerprints are matched. + + // Track number of matched fingerprints so we can exit if all matched. + var numMatched int + + // Determine the index for reading archive, starting from the most recent and moving towards the oldest + nextIndex := t.archiveIndex + matchedMetadata := make([]*reader.Metadata, len(fps)) + + // continue executing the loop until either all records are matched or all archive sets have been processed. + for i := 0; i < t.pollsToArchive; i++ { + // Update the mostRecentIndex + nextIndex = (nextIndex - 1 + t.pollsToArchive) % t.pollsToArchive + + data, err := t.readArchive(nextIndex) // we load one fileset atmost once per poll + if err != nil { + t.set.Logger.Error("error while opening archive", zap.Error(err)) + continue + } + archiveModified := false + for j, fp := range fps { + if matchedMetadata[j] != nil { + // we've already found a match for this index, continue + continue + } + if md := data.Match(fp, fileset.StartsWith); md != nil { + // update the matched metada for the index + matchedMetadata[j] = md + archiveModified = true + numMatched++ + } + } + if !archiveModified { + continue + } + // we save one fileset atmost once per poll + if err := t.writeArchive(nextIndex, data); err != nil { + t.set.Logger.Error("error while opening archive", zap.Error(err)) + } + // Check if all metadata have been found + if numMatched == len(fps) { + return matchedMetadata + } + } + return matchedMetadata +} + // noStateTracker only tracks the current polled files. Once the poll is // complete and telemetry is consumed, the tracked files are closed. The next // poll will create fresh readers with no previously tracked offsets. @@ -225,3 +293,5 @@ func (t *noStateTracker) ClosePreviousFiles() int { return 0 } func (t *noStateTracker) EndPoll() {} func (t *noStateTracker) TotalReaders() int { return 0 } + +func (t *noStateTracker) FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata { return nil } diff --git a/pkg/stanza/fileconsumer/internal/tracker/tracker_test.go b/pkg/stanza/fileconsumer/internal/tracker/tracker_test.go new file mode 100644 index 000000000000..f16e2d647032 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/tracker/tracker_test.go @@ -0,0 +1,62 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tracker // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker" + +import ( + "context" + "math/rand/v2" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" +) + +func TestFindFilesOrder(t *testing.T) { + fps := make([]*fingerprint.Fingerprint, 0) + for i := 0; i < 100; i++ { + fps = append(fps, fingerprint.New([]byte(uuid.NewString()))) + } + persister := testutil.NewUnscopedMockPersister() + fpInStorage := populatedPersisterData(persister, fps) + + tracker := NewFileTracker(componenttest.NewNopTelemetrySettings(), 0, 100, persister) + matchables := tracker.FindFiles(fps) + + require.Equal(t, len(fps), len(matchables), "return slice should be of same length as input slice") + + for i := 0; i < len(matchables); i++ { + if fpInStorage[i] { + // if current fingerprint is present in storage, the corresponding return type should not be nil + require.NotNilf(t, matchables[i], "resulting index %d should be not be nil type", i) + require.Truef(t, fps[i].Equal(matchables[i].GetFingerprint()), "fingerprint at index %d is not equal to corresponding return value", i) + } else { + // if current fingerprint is absent from storage, the corresponding index should be empty i.e. "nil" + require.Nil(t, matchables[i], "resulting index %d should be of nil type", i) + } + } +} + +func populatedPersisterData(persister operator.Persister, fps []*fingerprint.Fingerprint) []bool { + md := make([]*reader.Metadata, 0) + + fpInStorage := make([]bool, len(fps)) + for i, fp := range fps { + // 50-50 chance that a fingerprint exists in the storage + if rand.Float32() < 0.5 { + md = append(md, &reader.Metadata{Fingerprint: fp}) + fpInStorage[i] = true // mark the fingerprint at index i in storage + } + } + // save half keys in knownFiles0 and other half in knownFiles1 + _ = checkpoint.SaveKey(context.Background(), persister, md[:len(md)/2], "knownFiles0") + _ = checkpoint.SaveKey(context.Background(), persister, md[len(md)/2:], "knownFiles1") + return fpInStorage +} diff --git a/pkg/stanza/go.mod b/pkg/stanza/go.mod index 957df12615fb..bbab8cdaf78a 100644 --- a/pkg/stanza/go.mod +++ b/pkg/stanza/go.mod @@ -8,6 +8,7 @@ require ( github.com/expr-lang/expr v1.16.9 github.com/fsnotify/fsnotify v1.8.0 github.com/goccy/go-json v0.10.3 + github.com/google/uuid v1.6.0 github.com/jonboulle/clockwork v0.4.0 github.com/jpillora/backoff v1.0.0 github.com/json-iterator/go v1.1.12 @@ -50,7 +51,6 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect