From be932115782f5731ad2c9ae87dc8a50658c497ad Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Wed, 6 Dec 2023 07:30:08 -0800 Subject: [PATCH] [chore][pkg/stanza] Remove reader config (#29668) `reader.Config` contained many fields which were only relevant to `reader.Factory`. Once moving these fields to `reader.Factory`, it became apparent that the `reader.Reader` struct only needs access to a few specific fields (`fingerprintSize`, `maxLogSize`, `deleteAtEOF`, `emitFunc`). This also contains some corresponding cleanup of the fields within the factory struct. --- pkg/stanza/fileconsumer/config.go | 70 ++++++++----------- pkg/stanza/fileconsumer/config_test.go | 15 ++-- pkg/stanza/fileconsumer/file_test.go | 7 +- .../fileconsumer/internal/reader/factory.go | 63 +++++++++++------ .../internal/reader/factory_test.go | 24 +++---- .../fileconsumer/internal/reader/reader.go | 52 ++++++-------- 6 files changed, 115 insertions(+), 116 deletions(-) diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 41cbfa4e7451..70f5175e9d20 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -26,11 +26,9 @@ import ( ) const ( - defaultMaxLogSize = 1024 * 1024 defaultMaxConcurrentFiles = 1024 defaultEncoding = "utf-8" defaultPollInterval = 200 * time.Millisecond - defaultFlushPeriod = 500 * time.Millisecond ) var allowFileDeletion = featuregate.GlobalRegistry().MustRegister( @@ -50,40 +48,36 @@ var AllowHeaderMetadataParsing = featuregate.GlobalRegistry().MustRegister( // NewConfig creates a new input config with default values func NewConfig() *Config { return &Config{ - IncludeFileName: true, - IncludeFilePath: false, - IncludeFileNameResolved: false, - IncludeFilePathResolved: false, - PollInterval: defaultPollInterval, - Encoding: defaultEncoding, - StartAt: "end", - FingerprintSize: fingerprint.DefaultSize, - MaxLogSize: defaultMaxLogSize, - MaxConcurrentFiles: defaultMaxConcurrentFiles, - MaxBatches: 0, - FlushPeriod: defaultFlushPeriod, + PollInterval: defaultPollInterval, + MaxConcurrentFiles: defaultMaxConcurrentFiles, + StartAt: "end", + FingerprintSize: fingerprint.DefaultSize, + MaxLogSize: reader.DefaultMaxLogSize, + Encoding: defaultEncoding, + FlushPeriod: reader.DefaultFlushPeriod, + IncludeFileName: true, } } // Config is the configuration of a file input operator type Config struct { matcher.Criteria `mapstructure:",squash"` - IncludeFileName bool `mapstructure:"include_file_name,omitempty"` - IncludeFilePath bool `mapstructure:"include_file_path,omitempty"` - IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty"` - IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty"` PollInterval time.Duration `mapstructure:"poll_interval,omitempty"` + MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty"` + MaxBatches int `mapstructure:"max_batches,omitempty"` StartAt string `mapstructure:"start_at,omitempty"` FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty"` MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"` - MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty"` - MaxBatches int `mapstructure:"max_batches,omitempty"` - DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"` + Encoding string `mapstructure:"encoding,omitempty"` SplitConfig split.Config `mapstructure:"multiline,omitempty"` TrimConfig trim.Config `mapstructure:",squash,omitempty"` - Encoding string `mapstructure:"encoding,omitempty"` FlushPeriod time.Duration `mapstructure:"force_flush_period,omitempty"` + IncludeFileName bool `mapstructure:"include_file_name,omitempty"` + IncludeFilePath bool `mapstructure:"include_file_path,omitempty"` + IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty"` + IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty"` Header *HeaderConfig `mapstructure:"header,omitempty"` + DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"` } type HeaderConfig struct { @@ -159,23 +153,21 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli SugaredLogger: logger.With("component", "fileconsumer"), cancel: func() {}, readerFactory: reader.Factory{ - SugaredLogger: logger.With("component", "fileconsumer"), - Config: &reader.Config{ - FingerprintSize: int(c.FingerprintSize), - MaxLogSize: int(c.MaxLogSize), - Emit: emit, - IncludeFileName: c.IncludeFileName, - IncludeFilePath: c.IncludeFilePath, - IncludeFileNameResolved: c.IncludeFileNameResolved, - IncludeFilePathResolved: c.IncludeFilePathResolved, - DeleteAtEOF: c.DeleteAfterRead, - FlushTimeout: c.FlushPeriod, - }, - FromBeginning: startAtBeginning, - Encoding: enc, - SplitFunc: splitFunc, - TrimFunc: trimFunc, - HeaderConfig: hCfg, + SugaredLogger: logger.With("component", "fileconsumer"), + FromBeginning: startAtBeginning, + FingerprintSize: int(c.FingerprintSize), + MaxLogSize: int(c.MaxLogSize), + Encoding: enc, + SplitFunc: splitFunc, + TrimFunc: trimFunc, + FlushTimeout: c.FlushPeriod, + EmitFunc: emit, + IncludeFileName: c.IncludeFileName, + IncludeFilePath: c.IncludeFilePath, + IncludeFileNameResolved: c.IncludeFileNameResolved, + IncludeFilePathResolved: c.IncludeFilePathResolved, + HeaderConfig: hCfg, + DeleteAtEOF: c.DeleteAfterRead, }, fileMatcher: fileMatcher, pollInterval: c.PollInterval, diff --git a/pkg/stanza/fileconsumer/config_test.go b/pkg/stanza/fileconsumer/config_test.go index 7a32b81386ac..6a95947ff6ef 100644 --- a/pkg/stanza/fileconsumer/config_test.go +++ b/pkg/stanza/fileconsumer/config_test.go @@ -16,6 +16,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" @@ -26,17 +27,17 @@ import ( func TestNewConfig(t *testing.T) { cfg := NewConfig() + assert.Equal(t, 200*time.Millisecond, cfg.PollInterval) + assert.Equal(t, defaultMaxConcurrentFiles, cfg.MaxConcurrentFiles) + assert.Equal(t, "end", cfg.StartAt) + assert.Equal(t, fingerprint.DefaultSize, int(cfg.FingerprintSize)) + assert.Equal(t, defaultEncoding, cfg.Encoding) + assert.Equal(t, reader.DefaultMaxLogSize, int(cfg.MaxLogSize)) + assert.Equal(t, reader.DefaultFlushPeriod, cfg.FlushPeriod) assert.True(t, cfg.IncludeFileName) assert.False(t, cfg.IncludeFilePath) assert.False(t, cfg.IncludeFileNameResolved) assert.False(t, cfg.IncludeFilePathResolved) - assert.Equal(t, "end", cfg.StartAt) - assert.Equal(t, 200*time.Millisecond, cfg.PollInterval) - assert.Equal(t, fingerprint.DefaultSize, int(cfg.FingerprintSize)) - assert.Equal(t, defaultEncoding, cfg.Encoding) - assert.Equal(t, defaultMaxLogSize, int(cfg.MaxLogSize)) - assert.Equal(t, defaultMaxConcurrentFiles, cfg.MaxConcurrentFiles) - assert.Equal(t, defaultFlushPeriod, cfg.FlushPeriod) } func TestUnmarshal(t *testing.T) { diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 9667ac59b4a5..93d9481eac89 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -21,6 +21,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/attrs" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" @@ -49,7 +50,7 @@ func TestDefaultBehaviors(t *testing.T) { }() // Should not emit the pre-existing token, even after flush period - sink.ExpectNoCallsUntil(t, defaultFlushPeriod) + sink.ExpectNoCallsUntil(t, reader.DefaultFlushPeriod) // Complete token should be emitted quickly filetest.WriteString(t, temp, " testlog2 \n") @@ -60,8 +61,8 @@ func TestDefaultBehaviors(t *testing.T) { // Incomplete token should not be emitted until after flush period filetest.WriteString(t, temp, " testlog3 ") - sink.ExpectNoCallsUntil(t, defaultFlushPeriod/2) - time.Sleep(defaultFlushPeriod) + sink.ExpectNoCallsUntil(t, reader.DefaultFlushPeriod/2) + time.Sleep(reader.DefaultFlushPeriod) token, attributes = sink.NextCall(t) assert.Equal(t, []byte("testlog3"), token) diff --git a/pkg/stanza/fileconsumer/internal/reader/factory.go b/pkg/stanza/fileconsumer/internal/reader/factory.go index c083122e42fe..167acf98e63c 100644 --- a/pkg/stanza/fileconsumer/internal/reader/factory.go +++ b/pkg/stanza/fileconsumer/internal/reader/factory.go @@ -15,29 +15,43 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/attrs" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) +const ( + DefaultMaxLogSize = 1024 * 1024 + DefaultFlushPeriod = 500 * time.Millisecond +) + type Factory struct { *zap.SugaredLogger - Config *Config - FromBeginning bool - Encoding encoding.Encoding - HeaderConfig *header.Config - SplitFunc bufio.SplitFunc - TrimFunc trim.Func + HeaderConfig *header.Config + FromBeginning bool + FingerprintSize int + MaxLogSize int + Encoding encoding.Encoding + SplitFunc bufio.SplitFunc + TrimFunc trim.Func + FlushTimeout time.Duration + EmitFunc emit.Callback + IncludeFileName bool + IncludeFilePath bool + IncludeFileNameResolved bool + IncludeFilePathResolved bool + DeleteAtEOF bool } func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) { - return fingerprint.New(file, f.Config.FingerprintSize) + return fingerprint.New(file, f.FingerprintSize) } func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader, error) { m := &Metadata{Fingerprint: fp, FileAttributes: map[string]any{}} - if f.Config.FlushTimeout > 0 { + if f.FlushTimeout > 0 { m.FlushState = &flush.State{LastDataChange: time.Now()} } return f.NewReaderFromMetadata(file, m) @@ -45,17 +59,19 @@ func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, err error) { r = &Reader{ - Config: f.Config, - Metadata: m, - file: file, - fileName: file.Name(), - logger: f.SugaredLogger.With("path", file.Name()), - decoder: decode.New(f.Encoding), - lineSplitFunc: f.SplitFunc, + Metadata: m, + logger: f.SugaredLogger.With("path", file.Name()), + file: file, + fileName: file.Name(), + fingerprintSize: f.FingerprintSize, + maxLogSize: f.MaxLogSize, + decoder: decode.New(f.Encoding), + lineSplitFunc: f.SplitFunc, + deleteAtEOF: f.DeleteAtEOF, } - flushFunc := m.FlushState.Func(f.SplitFunc, f.Config.FlushTimeout) - r.lineSplitFunc = trim.WithFunc(trim.ToLength(flushFunc, f.Config.MaxLogSize), f.TrimFunc) + flushFunc := m.FlushState.Func(f.SplitFunc, f.FlushTimeout) + r.lineSplitFunc = trim.WithFunc(trim.ToLength(flushFunc, f.MaxLogSize), f.TrimFunc) if !f.FromBeginning { if err = r.offsetToEnd(); err != nil { @@ -63,15 +79,16 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, } } + r.emitFunc = f.EmitFunc if f.HeaderConfig == nil || m.HeaderFinalized { r.splitFunc = r.lineSplitFunc - r.processFunc = f.Config.Emit + r.processFunc = r.emitFunc } else { - r.splitFunc = f.HeaderConfig.SplitFunc r.headerReader, err = header.NewReader(f.SugaredLogger, *f.HeaderConfig) if err != nil { return nil, err } + r.splitFunc = f.HeaderConfig.SplitFunc r.processFunc = r.headerReader.Process } @@ -91,22 +108,22 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, f.Errorf("resolve abs: %w", err) } - if f.Config.IncludeFileName { + if f.IncludeFileName { r.FileAttributes[attrs.LogFileName] = filepath.Base(r.fileName) } else if r.FileAttributes[attrs.LogFileName] != nil { delete(r.FileAttributes, attrs.LogFileName) } - if f.Config.IncludeFilePath { + if f.IncludeFilePath { r.FileAttributes[attrs.LogFilePath] = r.fileName } else if r.FileAttributes[attrs.LogFilePath] != nil { delete(r.FileAttributes, attrs.LogFilePath) } - if f.Config.IncludeFileNameResolved { + if f.IncludeFileNameResolved { r.FileAttributes[attrs.LogFileNameResolved] = filepath.Base(abs) } else if r.FileAttributes[attrs.LogFileNameResolved] != nil { delete(r.FileAttributes, attrs.LogFileNameResolved) } - if f.Config.IncludeFilePathResolved { + if f.IncludeFilePathResolved { r.FileAttributes[attrs.LogFilePathResolved] = abs } else if r.FileAttributes[attrs.LogFilePathResolved] != nil { delete(r.FileAttributes, attrs.LogFilePathResolved) diff --git a/pkg/stanza/fileconsumer/internal/reader/factory_test.go b/pkg/stanza/fileconsumer/internal/reader/factory_test.go index e2b3b83c532e..3e9057c2c64c 100644 --- a/pkg/stanza/fileconsumer/internal/reader/factory_test.go +++ b/pkg/stanza/fileconsumer/internal/reader/factory_test.go @@ -25,8 +25,8 @@ const ( func testFactory(t *testing.T, opts ...testFactoryOpt) (*Factory, *emittest.Sink) { cfg := &testFactoryCfg{ - fingerprintSize: fingerprint.DefaultSize, fromBeginning: true, + fingerprintSize: fingerprint.DefaultSize, maxLogSize: defaultMaxLogSize, encoding: unicode.UTF8, trimFunc: trim.Whitespace, @@ -42,25 +42,23 @@ func testFactory(t *testing.T, opts ...testFactoryOpt) (*Factory, *emittest.Sink sink := emittest.NewSink(emittest.WithCallBuffer(cfg.sinkCallBufferSize)) return &Factory{ - SugaredLogger: testutil.Logger(t), - Config: &Config{ - FingerprintSize: cfg.fingerprintSize, - MaxLogSize: cfg.maxLogSize, - FlushTimeout: cfg.flushPeriod, - Emit: sink.Callback, - }, - FromBeginning: cfg.fromBeginning, - Encoding: cfg.encoding, - SplitFunc: splitFunc, - TrimFunc: cfg.trimFunc, + SugaredLogger: testutil.Logger(t), + FromBeginning: cfg.fromBeginning, + FingerprintSize: cfg.fingerprintSize, + MaxLogSize: cfg.maxLogSize, + Encoding: cfg.encoding, + SplitFunc: splitFunc, + TrimFunc: cfg.trimFunc, + FlushTimeout: cfg.flushPeriod, + EmitFunc: sink.Callback, }, sink } type testFactoryOpt func(*testFactoryCfg) type testFactoryCfg struct { - fingerprintSize int fromBeginning bool + fingerprintSize int maxLogSize int encoding encoding.Encoding splitCfg split.Config diff --git a/pkg/stanza/fileconsumer/internal/reader/reader.go b/pkg/stanza/fileconsumer/internal/reader/reader.go index ef5e2e7d5bc9..7bd2311d8138 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader.go @@ -9,7 +9,6 @@ import ( "errors" "fmt" "os" - "time" "go.uber.org/zap" @@ -21,18 +20,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush" ) -type Config struct { - FingerprintSize int - MaxLogSize int - Emit emit.Callback - IncludeFileName bool - IncludeFilePath bool - IncludeFileNameResolved bool - IncludeFilePathResolved bool - DeleteAtEOF bool - FlushTimeout time.Duration -} - type Metadata struct { Fingerprint *fingerprint.Fingerprint Offset int64 @@ -43,16 +30,19 @@ type Metadata struct { // Reader manages a single file type Reader struct { - *Config *Metadata - fileName string - logger *zap.SugaredLogger - file *os.File - lineSplitFunc bufio.SplitFunc - splitFunc bufio.SplitFunc - decoder *decode.Decoder - headerReader *header.Reader - processFunc emit.Callback + logger *zap.SugaredLogger + fileName string + file *os.File + fingerprintSize int + maxLogSize int + lineSplitFunc bufio.SplitFunc + splitFunc bufio.SplitFunc + decoder *decode.Decoder + headerReader *header.Reader + processFunc emit.Callback + emitFunc emit.Callback + deleteAtEOF bool } // offsetToEnd sets the starting offset @@ -69,7 +59,7 @@ func (r *Reader) NewFingerprintFromFile() (*fingerprint.Fingerprint, error) { if r.file == nil { return nil, errors.New("file is nil") } - return fingerprint.New(r.file, r.FingerprintSize) + return fingerprint.New(r.file, r.fingerprintSize) } // ReadToEnd will read until the end of the file @@ -79,7 +69,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) { return } - s := scanner.New(r, r.MaxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc) + s := scanner.New(r, r.maxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc) // Iterate over the tokenized file, emitting entries as we go for { @@ -93,7 +83,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) { if !ok { if err := s.Error(); err != nil { r.logger.Errorw("Failed during scan", zap.Error(err)) - } else if r.DeleteAtEOF { + } else if r.deleteAtEOF { r.delete() } break @@ -111,12 +101,12 @@ func (r *Reader) ReadToEnd(ctx context.Context) { // Do not use the updated offset from the old scanner, as the most recent token // could be split differently with the new splitter. r.splitFunc = r.lineSplitFunc - r.processFunc = r.Emit + r.processFunc = r.emitFunc if _, err = r.file.Seek(r.Offset, 0); err != nil { r.logger.Errorw("Failed to seek post-header", zap.Error(err)) return } - s = scanner.New(r, r.MaxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc) + s = scanner.New(r, r.maxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc) } else { r.logger.Errorw("process: %w", zap.Error(err)) } @@ -164,12 +154,12 @@ func (r *Reader) Close() *Metadata { func (r *Reader) Read(dst []byte) (int, error) { // Skip if fingerprint is already built // or if fingerprint is behind Offset - if len(r.Fingerprint.FirstBytes) == r.FingerprintSize || int(r.Offset) > len(r.Fingerprint.FirstBytes) { + if len(r.Fingerprint.FirstBytes) == r.fingerprintSize || int(r.Offset) > len(r.Fingerprint.FirstBytes) { return r.file.Read(dst) } n, err := r.file.Read(dst) - appendCount := min0(n, r.FingerprintSize-int(r.Offset)) - // return for n == 0 or r.Offset >= r.FingerprintSize + appendCount := min0(n, r.fingerprintSize-int(r.Offset)) + // return for n == 0 or r.Offset >= r.fingerprintSize if appendCount == 0 { return n, err } @@ -198,7 +188,7 @@ func (r *Reader) Validate() bool { if r.file == nil { return false } - refreshedFingerprint, err := fingerprint.New(r.file, r.FingerprintSize) + refreshedFingerprint, err := fingerprint.New(r.file, r.fingerprintSize) if err != nil { return false }