diff --git a/pkg/stanza/fileconsumer/internal/reader/factory_test.go b/pkg/stanza/fileconsumer/internal/reader/factory_test.go new file mode 100644 index 000000000000..7d46593d50ee --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/reader/factory_test.go @@ -0,0 +1,49 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package reader + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" +) + +const ( + defaultMaxLogSize = 1024 * 1024 + defaultMaxConcurrentFiles = 1024 + defaultEncoding = "utf-8" + defaultPollInterval = 200 * time.Millisecond + defaultFlushPeriod = 500 * time.Millisecond +) + +func testFactory(t *testing.T, sCfg split.Config, maxLogSize int, flushPeriod time.Duration) (*Factory, *emittest.Sink) { + enc, err := decode.LookupEncoding(defaultEncoding) + require.NoError(t, err) + + splitFunc, err := sCfg.Func(enc, false, maxLogSize) + require.NoError(t, err) + + sink := emittest.NewSink() + return &Factory{ + SugaredLogger: testutil.Logger(t), + Config: &Config{ + FingerprintSize: fingerprint.DefaultSize, + MaxLogSize: maxLogSize, + Emit: sink.Callback, + FlushTimeout: flushPeriod, + }, + FromBeginning: true, + Encoding: enc, + SplitFunc: splitFunc, + TrimFunc: trim.Whitespace, + }, sink +} diff --git a/pkg/stanza/fileconsumer/reader_test.go b/pkg/stanza/fileconsumer/internal/reader/split_test.go similarity index 77% rename from pkg/stanza/fileconsumer/reader_test.go rename to pkg/stanza/fileconsumer/internal/reader/split_test.go index 7b28b67f910e..194c924d88d5 100644 --- a/pkg/stanza/fileconsumer/reader_test.go +++ b/pkg/stanza/fileconsumer/internal/reader/split_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package fileconsumer +package reader import ( "context" @@ -12,21 +12,16 @@ import ( "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) func TestPersistFlusher(t *testing.T) { flushPeriod := 100 * time.Millisecond - f, sink := testReaderFactory(t, split.Config{}, defaultMaxLogSize, flushPeriod) + f, sink := testFactory(t, split.Config{}, defaultMaxLogSize, flushPeriod) temp := filetest.OpenTemp(t, t.TempDir()) fp, err := f.NewFingerprint(temp) @@ -112,7 +107,7 @@ func TestTokenization(t *testing.T) { for _, tc := range testCases { t.Run(tc.testName, func(t *testing.T) { - f, sink := testReaderFactory(t, split.Config{}, defaultMaxLogSize, defaultFlushPeriod) + f, sink := testFactory(t, split.Config{}, defaultMaxLogSize, defaultFlushPeriod) temp := filetest.OpenTemp(t, t.TempDir()) _, err := temp.Write(tc.fileContent) @@ -142,7 +137,7 @@ func TestTokenizationTooLong(t *testing.T) { []byte("aaa"), } - f, sink := testReaderFactory(t, split.Config{}, 10, defaultFlushPeriod) + f, sink := testFactory(t, split.Config{}, 10, defaultFlushPeriod) temp := filetest.OpenTemp(t, t.TempDir()) _, err := temp.Write(fileContent) @@ -174,7 +169,7 @@ func TestTokenizationTooLongWithLineStartPattern(t *testing.T) { sCfg := split.Config{} sCfg.LineStartPattern = `\d+-\d+-\d+` - f, sink := testReaderFactory(t, sCfg, 15, defaultFlushPeriod) + f, sink := testFactory(t, sCfg, 15, defaultFlushPeriod) temp := filetest.OpenTemp(t, t.TempDir()) _, err := temp.Write(fileContent) @@ -196,7 +191,7 @@ func TestTokenizationTooLongWithLineStartPattern(t *testing.T) { func TestHeaderFingerprintIncluded(t *testing.T) { fileContent := []byte("#header-line\naaa\n") - f, _ := testReaderFactory(t, split.Config{}, 10, defaultFlushPeriod) + f, _ := testFactory(t, split.Config{}, 10, defaultFlushPeriod) regexConf := regex.NewConfig() regexConf.Regex = "^#(?P
.*)" @@ -223,26 +218,3 @@ func TestHeaderFingerprintIncluded(t *testing.T) { require.Equal(t, []byte("#header-line\naaa\n"), r.Fingerprint.FirstBytes) } - -func testReaderFactory(t *testing.T, sCfg split.Config, maxLogSize int, flushPeriod time.Duration) (*reader.Factory, *emittest.Sink) { - enc, err := decode.LookupEncoding(defaultEncoding) - require.NoError(t, err) - - splitFunc, err := sCfg.Func(enc, false, maxLogSize) - require.NoError(t, err) - - sink := emittest.NewSink() - return &reader.Factory{ - SugaredLogger: testutil.Logger(t), - Config: &reader.Config{ - FingerprintSize: fingerprint.DefaultSize, - MaxLogSize: maxLogSize, - Emit: sink.Callback, - FlushTimeout: flushPeriod, - }, - FromBeginning: true, - Encoding: enc, - SplitFunc: splitFunc, - TrimFunc: trim.Whitespace, - }, sink -}