Skip to content

Commit

Permalink
fix: reduce redundancy test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
Vihas Splunk committed Sep 16, 2023
1 parent 89bb07a commit cae3290
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 32 deletions.
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func BenchmarkFileSizeVarying(b *testing.B) {
cfg.MaxConcurrentFiles = fileSize.maxConcurrent
emitCalls := make(chan *emitParams, max(fileSize.logs...)*b.N) // large enough to hold all the logs

operator, _ := buildTestManager(b, cfg, withEmitChan(emitCalls), withReaderChan())
operator, _ := buildTestManager(b, cfg, withEmitChan(emitCalls))
operator.persister = testutil.NewMockPersister("test")
defer func() {
require.NoError(b, operator.Stop())
Expand Down
5 changes: 4 additions & 1 deletion pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Manager struct {
// Following fields are used only when useThreadPool is enabled
knownFilesLock sync.RWMutex
trieLock sync.RWMutex
once sync.Once

workerWg sync.WaitGroup
readerChan chan readerEnvelope
Expand All @@ -74,7 +75,9 @@ func (m *Manager) Start(persister operator.Persister) error {

// If useThreadPool is enabled, kick off the worker threads
if useThreadPool.IsEnabled() {
m.kickoffThreads(ctx)
m.once.Do(func() {
m.kickoffThreads(ctx)
})
}

// Start polling goroutine
Expand Down
27 changes: 15 additions & 12 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ func TestAddFileFields(t *testing.T) {
// AddFileResolvedFields tests that the `log.file.name_resolved` and `log.file.path_resolved` fields are included
// when IncludeFileNameResolved and IncludeFilePathResolved are set to true
func TestAddFileResolvedFields(t *testing.T) {
t.Parallel()
if runtime.GOOS == windowsOS {
t.Skip("Windows symlinks usage disabled for now. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21088")
}
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
Expand Down Expand Up @@ -460,7 +460,7 @@ func TestReadNewLogs(t *testing.T) {
tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
operator, emitCalls := buildTestManager(t, cfg, withReaderChan())
operator, emitCalls := buildTestManager(t, cfg)
operator.persister = testutil.NewMockPersister("test")

// Poll once so we know this isn't a new file
Expand Down Expand Up @@ -488,7 +488,7 @@ func TestReadExistingAndNewLogs(t *testing.T) {
tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
operator, emitCalls := buildTestManager(t, cfg, withReaderChan())
operator, emitCalls := buildTestManager(t, cfg)
operator.persister = testutil.NewMockPersister("test")

// Start with a file with an entry in it, and expect that entry
Expand All @@ -512,7 +512,7 @@ func TestStartAtEnd(t *testing.T) {

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
operator, emitCalls := buildTestManager(t, cfg, withReaderChan())
operator, emitCalls := buildTestManager(t, cfg)
operator.persister = testutil.NewMockPersister("test")

temp := openTemp(t, tempDir)
Expand Down Expand Up @@ -540,7 +540,7 @@ func TestStartAtEndNewFile(t *testing.T) {
tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
operator, emitCalls := buildTestManager(t, cfg, withReaderChan())
operator, emitCalls := buildTestManager(t, cfg)
operator.persister = testutil.NewMockPersister("test")

operator.poll(context.Background())
Expand Down Expand Up @@ -656,7 +656,7 @@ func TestSplitWrite(t *testing.T) {
tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
operator, emitCalls := buildTestManager(t, cfg, withReaderChan())
operator, emitCalls := buildTestManager(t, cfg)
operator.persister = testutil.NewMockPersister("test")

temp := openTemp(t, tempDir)
Expand All @@ -676,7 +676,7 @@ func TestIgnoreEmptyFiles(t *testing.T) {
tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
operator, emitCalls := buildTestManager(t, cfg, withReaderChan())
operator, emitCalls := buildTestManager(t, cfg)
operator.persister = testutil.NewMockPersister("test")

temp := openTemp(t, tempDir)
Expand Down Expand Up @@ -997,7 +997,7 @@ func TestFileBatching(t *testing.T) {
cfg.MaxConcurrentFiles = maxConcurrentFiles
cfg.MaxBatches = maxBatches
emitCalls := make(chan *emitParams, files*linesPerFile)
operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls), withReaderChan())
operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls))
operator.persister = testutil.NewMockPersister("test")

core, observedLogs := observer.New(zap.DebugLevel)
Expand Down Expand Up @@ -1353,7 +1353,7 @@ func TestDeleteAfterRead(t *testing.T) {
cfg.StartAt = "beginning"
cfg.DeleteAfterRead = true
emitCalls := make(chan *emitParams, totalLines)
operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls), withReaderChan())
operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls))
operator.persister = testutil.NewMockPersister("test")

operator.poll(context.Background())
Expand Down Expand Up @@ -1385,7 +1385,7 @@ func TestMaxBatching(t *testing.T) {
cfg.MaxConcurrentFiles = maxConcurrentFiles
cfg.MaxBatches = maxBatches
emitCalls := make(chan *emitParams, files*linesPerFile)
operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls), withReaderChan())
operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls))
operator.persister = testutil.NewMockPersister("test")

core, observedLogs := observer.New(zap.DebugLevel)
Expand Down Expand Up @@ -1502,7 +1502,7 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) {
cfg.StartAt = "beginning"
cfg.DeleteAfterRead = true
emitCalls := make(chan *emitParams, longFileLines+1)
operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls), withReaderChan())
operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls))
operator.persister = testutil.NewMockPersister("test")

shortFile := openTemp(t, tempDir)
Expand Down Expand Up @@ -1544,6 +1544,9 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) {

// Stop consuming before long file has been fully consumed
cancel()

// Stop the worker threads
// Following section is a no-op if feature gate is disabled, so no need to check explicitly
operator.cancel()
if operator.readerChan != nil {
close(operator.readerChan)
Expand Down Expand Up @@ -1661,7 +1664,7 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) {
cfg := NewConfig().includeDir(tempDir)
cfg.FingerprintSize = 18
cfg.StartAt = "beginning"
operator, emitCalls := buildTestManager(t, cfg, withReaderChan())
operator, emitCalls := buildTestManager(t, cfg)
operator.persister = testutil.NewMockPersister("test")

// Both of they will be include
Expand Down
8 changes: 4 additions & 4 deletions pkg/stanza/fileconsumer/rotation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func TestMoveFile(t *testing.T) {
tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
operator, emitCalls := buildTestManager(t, cfg, withReaderChan())
operator, emitCalls := buildTestManager(t, cfg)
operator.persister = testutil.NewMockPersister("test")

temp1 := openTemp(t, tempDir)
Expand Down Expand Up @@ -389,7 +389,7 @@ func TestTrackMovedAwayFiles(t *testing.T) {
tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
operator, emitCalls := buildTestManager(t, cfg, withReaderChan())
operator, emitCalls := buildTestManager(t, cfg)
operator.persister = testutil.NewMockPersister("test")

temp1 := openTemp(t, tempDir)
Expand Down Expand Up @@ -474,7 +474,7 @@ func TestTruncateThenWrite(t *testing.T) {
tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
operator, emitCalls := buildTestManager(t, cfg, withReaderChan())
operator, emitCalls := buildTestManager(t, cfg)
operator.persister = testutil.NewMockPersister("test")

temp1 := openTemp(t, tempDir)
Expand Down Expand Up @@ -511,7 +511,7 @@ func TestCopyTruncateWriteBoth(t *testing.T) {
tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
operator, emitCalls := buildTestManager(t, cfg, withReaderChan())
operator, emitCalls := buildTestManager(t, cfg)
operator.persister = testutil.NewMockPersister("test")

temp1 := openTemp(t, tempDir)
Expand Down
19 changes: 5 additions & 14 deletions pkg/stanza/fileconsumer/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ type emitParams struct {
}

type testManagerConfig struct {
emitChan chan *emitParams
initializeChannel bool
emitChan chan *emitParams
}

type testManagerOption func(*testManagerConfig)
Expand All @@ -85,27 +84,19 @@ func withEmitChan(emitChan chan *emitParams) testManagerOption {
}
}

func withReaderChan() testManagerOption {
return func(c *testManagerConfig) {
c.initializeChannel = true
}
}

func buildTestManager(t testing.TB, cfg *Config, opts ...testManagerOption) (*Manager, chan *emitParams) {
tmc := &testManagerConfig{emitChan: make(chan *emitParams, 100)}
for _, opt := range opts {
opt(tmc)
}
input, err := cfg.Build(testutil.Logger(t), testEmitFunc(tmc.emitChan))
require.NoError(t, err)
if tmc.initializeChannel && useThreadPool.IsEnabled() {
input.readerChan = make(chan readerEnvelope, cfg.MaxConcurrentFiles/2)
if useThreadPool.IsEnabled() {
ctx, cancel := context.WithCancel(context.Background())
input.cancel = cancel
for i := 0; i < cfg.MaxConcurrentFiles/2; i++ {
input.workerWg.Add(1)
go input.worker(ctx)
}
input.once.Do(func() {
input.kickoffThreads(ctx)
})
}
return input, tmc.emitChan
}
Expand Down

0 comments on commit cae3290

Please sign in to comment.