diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 4a2ebb0480c2..4f849a84e367 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -4,9 +4,7 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" import ( - "bytes" "context" - "encoding/json" "fmt" "os" "sync" @@ -14,6 +12,7 @@ import ( "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" @@ -28,9 +27,9 @@ type Manager struct { readerFactory reader.Factory fileMatcher *matcher.Matcher roller roller - persister operator.Persister pollInterval time.Duration + persister operator.Persister maxBatches int maxBatchFiles int @@ -43,12 +42,20 @@ type Manager struct { func (m *Manager) Start(persister operator.Persister) error { ctx, cancel := context.WithCancel(context.Background()) m.cancel = cancel + m.persister = persister - // Load offsets from disk - if err := m.loadLastPollFiles(ctx); err != nil { + offsets, err := checkpoint.Load(ctx, m.persister) + if err != nil { return fmt.Errorf("read known files from database: %w", err) } + if len(offsets) > 0 { + m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.") + m.readerFactory.FromBeginning = true + for _, offset := range offsets { + m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: offset}) + } + } if _, err := m.fileMatcher.MatchFiles(); err != nil { m.Warnf("finding files: %v", err) @@ -150,7 +157,15 @@ func (m *Manager) consume(ctx context.Context, paths []string) { m.roller.roll(ctx, readers) m.saveCurrent(readers) - m.syncLastPollFiles(ctx) + + rmds := make([]*reader.Metadata, 0, len(readers)) + for _, r := range readers { + rmds = append(rmds, r.Metadata) + } + if err := checkpoint.Save(ctx, m.persister, rmds); err != nil { + m.Errorw("save offsets", zap.Error(err)) + } + m.clearCurrentFingerprints() } @@ -263,80 +278,3 @@ func (m *Manager) findFingerprintMatch(fp *fingerprint.Fingerprint) (*reader.Rea } return nil, false } - -const knownFilesKey = "knownFiles" - -// syncLastPollFiles syncs the most recent set of files to the database -func (m *Manager) syncLastPollFiles(ctx context.Context) { - var buf bytes.Buffer - enc := json.NewEncoder(&buf) - - // Encode the number of known files - if err := enc.Encode(len(m.knownFiles)); err != nil { - m.Errorw("Failed to encode known files", zap.Error(err)) - return - } - - // Encode each known file - for _, fileReader := range m.knownFiles { - if err := enc.Encode(fileReader.Metadata); err != nil { - m.Errorw("Failed to encode known files", zap.Error(err)) - } - } - - if err := m.persister.Set(ctx, knownFilesKey, buf.Bytes()); err != nil { - m.Errorw("Failed to sync to database", zap.Error(err)) - } -} - -// syncLastPollFiles loads the most recent set of files to the database -func (m *Manager) loadLastPollFiles(ctx context.Context) error { - encoded, err := m.persister.Get(ctx, knownFilesKey) - if err != nil { - return err - } - - if encoded == nil { - return nil - } - - dec := json.NewDecoder(bytes.NewReader(encoded)) - - // Decode the number of entries - var knownFileCount int - if err = dec.Decode(&knownFileCount); err != nil { - return fmt.Errorf("decoding file count: %w", err) - } - - if knownFileCount > 0 { - m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.") - m.readerFactory.FromBeginning = true - } - - // Decode each of the known files - for i := 0; i < knownFileCount; i++ { - rmd := new(reader.Metadata) - if err = dec.Decode(rmd); err != nil { - return err - } - - // Migrate readers that used FileAttributes.HeaderAttributes - // This block can be removed in a future release, tentatively v0.90.0 - if ha, ok := rmd.FileAttributes["HeaderAttributes"]; ok { - switch hat := ha.(type) { - case map[string]any: - for k, v := range hat { - rmd.FileAttributes[k] = v - } - delete(rmd.FileAttributes, "HeaderAttributes") - default: - m.Errorw("migrate header attributes: unexpected format") - } - } - - // This reader won't be used for anything other than metadata reference, so just wrap the metadata - m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: rmd}) - } - - return nil -} diff --git a/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go b/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go new file mode 100644 index 000000000000..964a2e324c09 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go @@ -0,0 +1,91 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package checkpoint // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint" + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" +) + +const knownFilesKey = "knownFiles" + +// Save syncs the most recent set of files to the database +func Save(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata) error { + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + + // Encode the number of known files + if err := enc.Encode(len(rmds)); err != nil { + return fmt.Errorf("encode num files: %w", err) + } + + var errs []error + // Encode each known file + for _, rmd := range rmds { + if err := enc.Encode(rmd); err != nil { + errs = append(errs, fmt.Errorf("encode metadata: %w", err)) + } + } + + if err := persister.Set(ctx, knownFilesKey, buf.Bytes()); err != nil { + errs = append(errs, fmt.Errorf("persist known files: %w", err)) + } + + return errors.Join(errs...) +} + +// Load loads the most recent set of files to the database +func Load(ctx context.Context, persister operator.Persister) ([]*reader.Metadata, error) { + encoded, err := persister.Get(ctx, knownFilesKey) + if err != nil { + return nil, err + } + + if encoded == nil { + return []*reader.Metadata{}, nil + } + + dec := json.NewDecoder(bytes.NewReader(encoded)) + + // Decode the number of entries + var knownFileCount int + if err = dec.Decode(&knownFileCount); err != nil { + return nil, fmt.Errorf("decoding file count: %w", err) + } + + // Decode each of the known files + var errs []error + rmds := make([]*reader.Metadata, 0, knownFileCount) + for i := 0; i < knownFileCount; i++ { + rmd := new(reader.Metadata) + if err = dec.Decode(rmd); err != nil { + return nil, err + } + + // Migrate readers that used FileAttributes.HeaderAttributes + // This block can be removed in a future release, tentatively v0.90.0 + if ha, ok := rmd.FileAttributes["HeaderAttributes"]; ok { + switch hat := ha.(type) { + case map[string]any: + for k, v := range hat { + rmd.FileAttributes[k] = v + } + delete(rmd.FileAttributes, "HeaderAttributes") + default: + errs = append(errs, errors.New("migrate header attributes: unexpected format")) + } + } + + // This reader won't be used for anything other than metadata reference, so just wrap the metadata + rmds = append(rmds, rmd) + } + + return rmds, errors.Join(errs...) +} diff --git a/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint_test.go b/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint_test.go new file mode 100644 index 000000000000..406a8b262bd2 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint_test.go @@ -0,0 +1,149 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package checkpoint + +import ( + "bytes" + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" +) + +func TestLoadNothing(t *testing.T) { + reloaded, err := Load(context.Background(), testutil.NewUnscopedMockPersister()) + assert.NoError(t, err) + assert.Equal(t, []*reader.Metadata{}, reloaded) +} + +func TestSaveErr(t *testing.T) { + assert.Error(t, Save(context.Background(), + testutil.NewErrPersister(map[string]error{ + "knownFiles": assert.AnError, + }), []*reader.Metadata{})) +} + +func TestLoadErr(t *testing.T) { + _, err := Load(context.Background(), + testutil.NewErrPersister(map[string]error{ + "knownFiles": assert.AnError, + })) + assert.Error(t, err) +} + +func TestNopEncodingDifferentLogSizes(t *testing.T) { + testCases := []struct { + name string + rmds []*reader.Metadata + }{ + { + "empty", + []*reader.Metadata{}, + }, + { + "one", + []*reader.Metadata{ + { + Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")}, + Offset: 3, + }, + }, + }, + { + "two", + []*reader.Metadata{ + { + Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")}, + Offset: 3, + }, + { + Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("barrrr")}, + Offset: 6, + }, + }, + }, + { + "other_fields", + []*reader.Metadata{ + { + Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")}, + Offset: 3, + FileAttributes: map[string]interface{}{ + "hello": "world", + }, + }, + { + Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("barrrr")}, + Offset: 6, + HeaderFinalized: true, + }, + { + Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("ab")}, + Offset: 2, + FileAttributes: map[string]interface{}{ + "hello2": "world2", + }, + HeaderFinalized: true, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + p := testutil.NewUnscopedMockPersister() + assert.NoError(t, Save(context.Background(), p, tc.rmds)) + reloaded, err := Load(context.Background(), p) + assert.NoError(t, err) + assert.Equal(t, tc.rmds, reloaded) + }) + } +} + +type deprecatedMetadata struct { + reader.Metadata + HeaderAttributes map[string]any +} + +func TestMigrateHeaderAttributes(t *testing.T) { + p := testutil.NewUnscopedMockPersister() + saveDeprecated(t, p, &deprecatedMetadata{ + Metadata: reader.Metadata{ + Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")}, + Offset: 3, + FileAttributes: map[string]any{ + "HeaderAttributes": map[string]any{ + "hello": "world", + }, + }, + }, + }) + reloaded, err := Load(context.Background(), p) + assert.NoError(t, err) + assert.Equal(t, []*reader.Metadata{ + { + Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")}, + Offset: 3, + FileAttributes: map[string]interface{}{ + "hello": "world", + }, + }, + }, reloaded) + +} + +func saveDeprecated(t *testing.T, persister operator.Persister, dep *deprecatedMetadata) { + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + require.NoError(t, enc.Encode(1)) + require.NoError(t, enc.Encode(dep)) + require.NoError(t, persister.Set(context.Background(), knownFilesKey, buf.Bytes())) +} diff --git a/pkg/stanza/testutil/util.go b/pkg/stanza/testutil/util.go index 948c4406b739..eace557ebacb 100644 --- a/pkg/stanza/testutil/util.go +++ b/pkg/stanza/testutil/util.go @@ -24,17 +24,24 @@ func Logger(t testing.TB) *zap.SugaredLogger { type mockPersister struct { data map[string][]byte dataMux sync.Mutex + errKeys map[string]error } func (p *mockPersister) Get(_ context.Context, k string) ([]byte, error) { p.dataMux.Lock() defer p.dataMux.Unlock() + if _, ok := p.errKeys[k]; ok { + return nil, p.errKeys[k] + } return p.data[k], nil } func (p *mockPersister) Set(_ context.Context, k string, v []byte) error { p.dataMux.Lock() defer p.dataMux.Unlock() + if _, ok := p.errKeys[k]; ok { + return p.errKeys[k] + } p.data[k] = v return nil } @@ -42,6 +49,9 @@ func (p *mockPersister) Set(_ context.Context, k string, v []byte) error { func (p *mockPersister) Delete(_ context.Context, k string) error { p.dataMux.Lock() defer p.dataMux.Unlock() + if _, ok := p.errKeys[k]; ok { + return p.errKeys[k] + } delete(p.data, k) return nil } @@ -52,11 +62,17 @@ func NewUnscopedMockPersister() operator.Persister { return &mockPersister{data: data} } -// NewMockPersister will return a new persister for testing func NewMockPersister(scope string) operator.Persister { return operator.NewScopedPersister(scope, NewUnscopedMockPersister()) } +// NewErrPersister will return a new persister for testing +// which will return an error if any of the specified keys are used +func NewErrPersister(errKeys map[string]error) operator.Persister { + data := make(map[string][]byte) + return &mockPersister{data: data, errKeys: errKeys} +} + // Trim removes white space from the lines of a string func Trim(s string) string { lines := strings.Split(s, "\n")