Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][fileconsumer/archive] - Add archive read logic #35798

Merged
merged 55 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
ea3f57d
chore: add initial structure and signature methods
VihasMakwana Aug 6, 2024
17ec1aa
fix: add license
VihasMakwana Aug 6, 2024
26deb01
fix: lint
VihasMakwana Aug 6, 2024
fe1912c
fix: lint
VihasMakwana Aug 6, 2024
33566b0
fix: check
VihasMakwana Aug 6, 2024
5acb848
chore: add write method, update interface
VihasMakwana Aug 19, 2024
06e4a45
Merge branch 'main' into create-archive-storage
VihasMakwana Aug 22, 2024
431ab41
chore: commit, archive module
VihasMakwana Aug 22, 2024
077ab35
chore: cleanup, simplify the PR
VihasMakwana Sep 13, 2024
e3cdd5d
fix: initial commit, second PR
VihasMakwana Sep 16, 2024
d88d5ae
chore: remove stanza.go
VihasMakwana Sep 17, 2024
d9e5992
Format the comment
VihasMakwana Sep 17, 2024
c68817a
chore: use options
VihasMakwana Sep 17, 2024
39d9e86
fix: lint
VihasMakwana Sep 27, 2024
a6a010e
Merge branch 'main' into create-archive-storage-temp
VihasMakwana Sep 27, 2024
3490e7d
fix: bug
VihasMakwana Sep 27, 2024
87c2d2a
chore: rename function
VihasMakwana Sep 27, 2024
b668554
chore: rename function
VihasMakwana Sep 27, 2024
7adf335
Merge branch 'main' into create-archive-storage-temp
VihasMakwana Sep 27, 2024
924e002
Merge branch 'main' into create-archive-storage-temp
VihasMakwana Sep 30, 2024
3f762ed
chore: cleanup tracker and use options
VihasMakwana Sep 30, 2024
66824c6
fix: move function before loading
VihasMakwana Sep 30, 2024
ef2e53a
chore: log the error
VihasMakwana Sep 30, 2024
f009e71
chore: lint, ci
VihasMakwana Sep 30, 2024
79ce0e3
chore: remove redundant argument
VihasMakwana Sep 30, 2024
0dd23be
Merge branch 'cleanup-tracker-options' into create-archive-storage-temp
VihasMakwana Oct 2, 2024
cae05c7
chore: remove redundant code
VihasMakwana Oct 2, 2024
25d923b
chore: add new no tracking
VihasMakwana Oct 2, 2024
4bc2150
fix: pass persister instead of m.persister
VihasMakwana Oct 2, 2024
9006e3f
chore: remove options
VihasMakwana Oct 3, 2024
4413322
chore: fix tests
VihasMakwana Oct 3, 2024
637bfa5
Merge branch 'main' into create-archive-storage-temp
VihasMakwana Oct 3, 2024
79b48c1
initial read commit
VihasMakwana Oct 8, 2024
d2ae123
Merge branch 'main' into archive-read-logic
VihasMakwana Oct 11, 2024
e284eb1
improve readablity
VihasMakwana Oct 14, 2024
0b8a00d
improve readablity
VihasMakwana Oct 14, 2024
d00b376
comments
VihasMakwana Oct 15, 2024
2936928
lint
VihasMakwana Oct 22, 2024
fbf35c5
chore: improve logic
VihasMakwana Nov 1, 2024
cb59bc4
comments
VihasMakwana Nov 1, 2024
5560f6a
lint
VihasMakwana Nov 1, 2024
b45a4c0
use modulo
VihasMakwana Nov 6, 2024
200eadb
chore: remove record and update tests
VihasMakwana Nov 9, 2024
ad46f94
comments
VihasMakwana Nov 9, 2024
b9f65f3
lint and check
VihasMakwana Nov 10, 2024
90dadb0
lint and check
VihasMakwana Nov 10, 2024
2dac57f
gci
VihasMakwana Nov 11, 2024
9981c78
chore: cleanup, test cases
VihasMakwana Dec 2, 2024
fa48e76
lint
VihasMakwana Dec 2, 2024
a2437d9
Merge branch 'main' into archive-read-logic
VihasMakwana Dec 2, 2024
95199f2
chore: simplify
VihasMakwana Dec 4, 2024
08e3aad
chore: improve readability
VihasMakwana Dec 4, 2024
44a2e78
Merge branch 'main' into archive-read-logic
VihasMakwana Dec 4, 2024
df5fd11
fix: set nextIndex to t.archiveIndex
VihasMakwana Dec 4, 2024
da11fd0
Merge branch 'main' into archive-read-logic
VihasMakwana Dec 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.M

// Load loads the most recent set of files to the database
func Load(ctx context.Context, persister operator.Persister) ([]*reader.Metadata, error) {
encoded, err := persister.Get(ctx, knownFilesKey)
return LoadKey(ctx, persister, knownFilesKey)
}

func LoadKey(ctx context.Context, persister operator.Persister, key string) ([]*reader.Metadata, error) {
encoded, err := persister.Get(ctx, key)
if err != nil {
return nil, err
}
Expand Down
74 changes: 72 additions & 2 deletions pkg/stanza/fileconsumer/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Tracker interface {
EndPoll()
EndConsume() int
TotalReaders() int
FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata
}

// fileTracker tracks known offsets for files that are being consumed by the manager.
Expand Down Expand Up @@ -164,13 +165,80 @@ func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) {
if t.pollsToArchive <= 0 || t.persister == nil {
return
}
key := fmt.Sprintf("knownFiles%d", t.archiveIndex)
if err := checkpoint.SaveKey(context.Background(), t.persister, metadata.Get(), key); err != nil {
if err := t.writeArchive(t.archiveIndex, metadata); err != nil {
t.set.Logger.Error("error faced while saving to the archive", zap.Error(err))
}
t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index
}

// readArchive loads data from the archive for a given index and returns a fileset.Filset.
func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata], error) {
key := fmt.Sprintf("knownFiles%d", index)
metadata, err := checkpoint.LoadKey(context.Background(), t.persister, key)
if err != nil {
return nil, err
}
f := fileset.New[*reader.Metadata](len(metadata))
f.Add(metadata...)
return f, nil
}

// writeArchive saves data to the archive for a given index and returns an error, if encountered.
func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata]) error {
key := fmt.Sprintf("knownFiles%d", index)
return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), key)
}

// FindFiles goes through archive, one fileset at a time and tries to match all fingerprints against that loaded set.
func (t *fileTracker) FindFiles(fps []*fingerprint.Fingerprint) []*reader.Metadata {
// To minimize disk access, we first access the index, then review unmatched files and update the metadata, if found.
// We exit if all fingerprints are matched.
djaglowski marked this conversation as resolved.
Show resolved Hide resolved

// Track number of matched fingerprints so we can exit if all matched.
var numMatched int

// Determine the index for reading archive, starting from the most recent and moving towards the oldest
nextIndex := t.archiveIndex
matchedMetadata := make([]*reader.Metadata, len(fps))

// continue executing the loop until either all records are matched or all archive sets have been processed.
for i := 0; i < t.pollsToArchive; i++ {
// Update the mostRecentIndex
nextIndex = (nextIndex - 1 + t.pollsToArchive) % t.pollsToArchive

data, err := t.readArchive(nextIndex) // we load one fileset atmost once per poll
if err != nil {
t.set.Logger.Error("error while opening archive", zap.Error(err))
continue
}
archiveModified := false
for j, fp := range fps {
if matchedMetadata[j] != nil {
// we've already found a match for this index, continue
continue
}
if md := data.Match(fp, fileset.StartsWith); md != nil {
// update the matched metada for the index
matchedMetadata[j] = md
archiveModified = true
numMatched++
}
}
if !archiveModified {
continue
}
// we save one fileset atmost once per poll
if err := t.writeArchive(nextIndex, data); err != nil {
t.set.Logger.Error("error while opening archive", zap.Error(err))
}
// Check if all metadata have been found
if numMatched == len(fps) {
return matchedMetadata
}
}
return matchedMetadata
}

// noStateTracker only tracks the current polled files. Once the poll is
// complete and telemetry is consumed, the tracked files are closed. The next
// poll will create fresh readers with no previously tracked offsets.
Expand Down Expand Up @@ -225,3 +293,5 @@ func (t *noStateTracker) ClosePreviousFiles() int { return 0 }
func (t *noStateTracker) EndPoll() {}

func (t *noStateTracker) TotalReaders() int { return 0 }

func (t *noStateTracker) FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata { return nil }
62 changes: 62 additions & 0 deletions pkg/stanza/fileconsumer/internal/tracker/tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package tracker // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker"

import (
"context"
"math/rand/v2"
"testing"

"github.com/google/uuid"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
)

func TestFindFilesOrder(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know your thoughts over this test @djaglowski.

fps := make([]*fingerprint.Fingerprint, 0)
for i := 0; i < 100; i++ {
fps = append(fps, fingerprint.New([]byte(uuid.NewString())))
}
persister := testutil.NewUnscopedMockPersister()
fpInStorage := populatedPersisterData(persister, fps)

tracker := NewFileTracker(componenttest.NewNopTelemetrySettings(), 0, 100, persister)
matchables := tracker.FindFiles(fps)

require.Equal(t, len(fps), len(matchables), "return slice should be of same length as input slice")

for i := 0; i < len(matchables); i++ {
if fpInStorage[i] {
// if current fingerprint is present in storage, the corresponding return type should not be nil
require.NotNilf(t, matchables[i], "resulting index %d should be not be nil type", i)
require.Truef(t, fps[i].Equal(matchables[i].GetFingerprint()), "fingerprint at index %d is not equal to corresponding return value", i)
} else {
// if current fingerprint is absent from storage, the corresponding index should be empty i.e. "nil"
require.Nil(t, matchables[i], "resulting index %d should be of nil type", i)
}
}
}

func populatedPersisterData(persister operator.Persister, fps []*fingerprint.Fingerprint) []bool {
md := make([]*reader.Metadata, 0)

fpInStorage := make([]bool, len(fps))
for i, fp := range fps {
// 50-50 chance that a fingerprint exists in the storage
if rand.Float32() < 0.5 {
md = append(md, &reader.Metadata{Fingerprint: fp})
fpInStorage[i] = true // mark the fingerprint at index i in storage
}
}
// save half keys in knownFiles0 and other half in knownFiles1
_ = checkpoint.SaveKey(context.Background(), persister, md[:len(md)/2], "knownFiles0")
_ = checkpoint.SaveKey(context.Background(), persister, md[len(md)/2:], "knownFiles1")
return fpInStorage
}
2 changes: 1 addition & 1 deletion pkg/stanza/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/expr-lang/expr v1.16.9
github.com/fsnotify/fsnotify v1.8.0
github.com/goccy/go-json v0.10.3
github.com/google/uuid v1.6.0
github.com/jonboulle/clockwork v0.4.0
github.com/jpillora/backoff v1.0.0
github.com/json-iterator/go v1.1.12
Expand Down Expand Up @@ -50,7 +51,6 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
Expand Down
Loading