From dbc668dec949fa6cb9d1cb4bd160a66b2e539c54 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Fri, 11 Aug 2023 11:52:46 +0530 Subject: [PATCH 01/19] chore: add new feature gate, use locking --- pkg/stanza/fileconsumer/benchmark_test.go | 69 +++++++ pkg/stanza/fileconsumer/config.go | 17 +- pkg/stanza/fileconsumer/file.go | 53 +++++- pkg/stanza/fileconsumer/file_test.go | 50 +++-- pkg/stanza/fileconsumer/file_threadpool.go | 203 +++++++++++++++++++++ pkg/stanza/fileconsumer/rotation_test.go | 10 +- pkg/stanza/fileconsumer/util_test.go | 20 +- receiver/filelogreceiver/filelog_test.go | 66 +++++-- 8 files changed, 443 insertions(+), 45 deletions(-) create mode 100755 pkg/stanza/fileconsumer/file_threadpool.go diff --git a/pkg/stanza/fileconsumer/benchmark_test.go b/pkg/stanza/fileconsumer/benchmark_test.go index 42315e78bbb9..7405de8be5a6 100644 --- a/pkg/stanza/fileconsumer/benchmark_test.go +++ b/pkg/stanza/fileconsumer/benchmark_test.go @@ -4,9 +4,13 @@ package fileconsumer import ( + "context" + "fmt" "os" "path/filepath" + "sync" "testing" + "time" "github.com/stretchr/testify/require" @@ -180,3 +184,68 @@ func BenchmarkFileInput(b *testing.B) { }) } } + +func BenchmarkLogsThroughput(b *testing.B) { + getMessage := func(f, m int) string { return fmt.Sprintf("file %d, message %d\n", f, m) } + rootDir := b.TempDir() + file := openFile(b, filepath.Join(rootDir, "file0.log")) + file1 := openFile(b, filepath.Join(rootDir, "file1.log")) + cfg := NewConfig().includeDir(rootDir) + cfg.StartAt = "beginning" + cfg.MaxConcurrentFiles = 8 + emitCalls := make(chan *emitParams, b.N*5) + operator, _ := buildTestManager(b, cfg, withEmitChan(emitCalls), withReaderChan()) + operator.persister = testutil.NewMockPersister("test") + + total := b.N * 100 + factor := 2000 + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + totalLogs := 2*total + 2*(total/factor) + for i := 0; i < totalLogs; i++ { + <-emitCalls + // c := <-emitCalls + } + // fmt.Println(totalLogs) + }() + b.ResetTimer() + + // write more logs in one file + for i := 0; i < total; i++ { + writeString(b, file, getMessage(0, i)) + } + // write less logs in one file + for i := 0; i < total/factor; i++ { + writeString(b, file1, getMessage(1, i)) + } + + start := time.Now() + + if useThreadPool.IsEnabled() { + operator.pollConcurrent(context.Background()) + } else { + operator.poll(context.Background()) + } + // // create different files for second poll + file = openFile(b, filepath.Join(rootDir, "file2.log")) + file1 = openFile(b, filepath.Join(rootDir, "file3.log")) + // // write more logs in one file + for i := 0; i < total; i++ { + writeString(b, file, getMessage(2, i)) + } + // write less logs in one file + for i := 0; i < total/factor; i++ { + writeString(b, file1, getMessage(3, i)) + } + // start2 := time.Now() + if useThreadPool.IsEnabled() { + operator.pollConcurrent(context.Background()) + } else { + operator.poll(context.Background()) + } + wg.Wait() + fmt.Println(time.Now().Sub(start), b.N) +} diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index cf02b131aa64..802c4fbb2bc8 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -15,6 +15,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/trie" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" @@ -32,6 +33,13 @@ var allowFileDeletion = featuregate.GlobalRegistry().MustRegister( featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16314"), ) +var useThreadPool = featuregate.GlobalRegistry().MustRegister( + "filelog.useThreadPool", + featuregate.StageAlpha, + featuregate.WithRegisterDescription("When enabled, log collection switches to a thread pool model, respecting the `poll_interval` config."), + // featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16314"), +) + var AllowHeaderMetadataParsing = featuregate.GlobalRegistry().MustRegister( "filelog.allowHeaderMetadataParsing", featuregate.StageAlpha, @@ -145,7 +153,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact return nil, err } - return &Manager{ + manager := Manager{ SugaredLogger: logger.With("component", "fileconsumer"), cancel: func() {}, readerFactory: readerFactory{ @@ -172,7 +180,12 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact deleteAfterRead: c.DeleteAfterRead, knownFiles: make([]*reader, 0, 10), seenPaths: make(map[string]struct{}, 100), - }, nil + } + if useThreadPool.IsEnabled() { + manager.readerChan = make(chan readerWrapper, c.MaxConcurrentFiles) + manager.trie = trie.NewTrie() + } + return &manager, nil } func (c Config) validate() error { diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index d87546a2b5c4..8697b6a61354 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -15,6 +15,7 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/trie" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" ) @@ -45,6 +46,17 @@ type Manager struct { seenPaths map[string]struct{} currentFps []*fingerprint.Fingerprint + + // Following fields are used only when useThreadPool is enabled + workerWg sync.WaitGroup + _workerWg sync.WaitGroup + knownFilesLock sync.RWMutex + + readerChan chan readerWrapper + trieLock sync.RWMutex + + // TRIE - this data structure stores the fingerprint of the files which are currently being consumed + trie *trie.Trie } func (m *Manager) Start(persister operator.Persister) error { @@ -61,6 +73,11 @@ func (m *Manager) Start(persister operator.Persister) error { m.Warnw("finding files", "error", err.Error()) } + // If useThreadPool is enabled, kick off the worker threads + if useThreadPool.IsEnabled() { + m.kickOffThreads(ctx) + } + // Start polling goroutine m.startPoller(ctx) @@ -71,6 +88,10 @@ func (m *Manager) Start(persister operator.Persister) error { func (m *Manager) Stop() error { m.cancel() m.wg.Wait() + if useThreadPool.IsEnabled() { + m.shutdownThreads() + } + m.roller.cleanup() for _, reader := range m.knownFiles { reader.Close() @@ -95,14 +116,21 @@ func (m *Manager) startPoller(ctx context.Context) { return case <-globTicker.C: } - m.poll(ctx) } }() } -// poll checks all the watched paths for new entries func (m *Manager) poll(ctx context.Context) { + if useThreadPool.IsEnabled() { + m.pollConcurrent(ctx) + } else { + m.pollRegular(ctx) + } +} + +// poll checks all the watched paths for new entries +func (m *Manager) pollRegular(ctx context.Context) { // Increment the generation on all known readers // This is done here because the next generation is about to start for i := 0; i < len(m.knownFiles); i++ { @@ -134,6 +162,18 @@ func (m *Manager) poll(ctx context.Context) { m.consume(ctx, matches) } +func (m *Manager) readToEnd(ctx context.Context, r *reader) bool { + r.ReadToEnd(ctx) + if m.deleteAfterRead && r.eof { + r.Close() + if err := os.Remove(r.file.Name()); err != nil { + m.Errorf("could not delete %s", r.file.Name()) + } + return true + } + return false +} + func (m *Manager) consume(ctx context.Context, paths []string) { m.Debug("Consuming files") readers := make([]*reader, 0, len(paths)) @@ -154,14 +194,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) { wg.Add(1) go func(r *reader) { defer wg.Done() - r.ReadToEnd(ctx) - // Delete a file if deleteAfterRead is enabled and we reached the end of the file - if m.deleteAfterRead && r.eof { - r.Close() - if err := os.Remove(r.file.Name()); err != nil { - m.Errorf("could not delete %s", r.file.Name()) - } - } + m.readToEnd(ctx, r) }(r) } wg.Wait() diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index d04d4e64b413..9d750d0d9d65 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -26,6 +26,20 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" ) +func TestMain(m *testing.M) { + // Run once with thread pool featuregate enabled + featuregate.GlobalRegistry().Set(useThreadPool.ID(), true) //nolint:all + if code := m.Run(); code > 0 { + os.Exit(code) + } + featuregate.GlobalRegistry().Set(useThreadPool.ID(), false) //nolint:all + + // Run once with thread pool featuregate disabled + if code := m.Run(); code > 0 { + os.Exit(code) + } +} + func TestCleanStop(t *testing.T) { t.Parallel() t.Skip(`Skipping due to goroutine leak in opencensus. @@ -78,10 +92,10 @@ func TestAddFileFields(t *testing.T) { // AddFileResolvedFields tests that the `log.file.name_resolved` and `log.file.path_resolved` fields are included // when IncludeFileNameResolved and IncludeFilePathResolved are set to true func TestAddFileResolvedFields(t *testing.T) { + t.Parallel() if runtime.GOOS == windowsOS { t.Skip("Windows symlinks usage disabled for now. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21088") } - t.Parallel() tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) @@ -445,7 +459,7 @@ func TestReadNewLogs(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) operator.persister = testutil.NewMockPersister("test") // Poll once so we know this isn't a new file @@ -473,7 +487,7 @@ func TestReadExistingAndNewLogs(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) operator.persister = testutil.NewMockPersister("test") // Start with a file with an entry in it, and expect that entry @@ -497,7 +511,7 @@ func TestStartAtEnd(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) operator.persister = testutil.NewMockPersister("test") temp := openTemp(t, tempDir) @@ -525,7 +539,7 @@ func TestStartAtEndNewFile(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) operator.persister = testutil.NewMockPersister("test") operator.poll(context.Background()) @@ -642,7 +656,7 @@ func TestSplitWrite(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) operator.persister = testutil.NewMockPersister("test") temp := openTemp(t, tempDir) @@ -662,7 +676,7 @@ func TestIgnoreEmptyFiles(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) operator.persister = testutil.NewMockPersister("test") temp := openTemp(t, tempDir) @@ -967,6 +981,9 @@ func TestManyLogsDelivered(t *testing.T) { func TestFileBatching(t *testing.T) { t.Parallel() + if useThreadPool.IsEnabled() { + t.Skip(`Skipping for thread pool feature gate, as there's no concept of batching for thread pool`) + } files := 100 linesPerFile := 10 @@ -983,7 +1000,7 @@ func TestFileBatching(t *testing.T) { cfg.MaxConcurrentFiles = maxConcurrentFiles cfg.MaxBatches = maxBatches emitCalls := make(chan *emitParams, files*linesPerFile) - operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls)) + operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls), withReaderChan()) operator.persister = testutil.NewMockPersister("test") core, observedLogs := observer.New(zap.DebugLevel) @@ -1339,7 +1356,8 @@ func TestDeleteAfterRead(t *testing.T) { cfg.StartAt = "beginning" cfg.DeleteAfterRead = true emitCalls := make(chan *emitParams, totalLines) - operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls)) + operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls), withReaderChan()) + operator.persister = testutil.NewMockPersister("test") operator.poll(context.Background()) actualTokens = append(actualTokens, waitForNTokens(t, emitCalls, totalLines)...) @@ -1353,6 +1371,9 @@ func TestDeleteAfterRead(t *testing.T) { } func TestMaxBatching(t *testing.T) { + if useThreadPool.IsEnabled() { + t.Skip(`Skipping for thread pool feature gate, as there's no concept of batching for thread pool`) + } t.Parallel() files := 50 @@ -1370,7 +1391,7 @@ func TestMaxBatching(t *testing.T) { cfg.MaxConcurrentFiles = maxConcurrentFiles cfg.MaxBatches = maxBatches emitCalls := make(chan *emitParams, files*linesPerFile) - operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls)) + operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls), withReaderChan()) operator.persister = testutil.NewMockPersister("test") core, observedLogs := observer.New(zap.DebugLevel) @@ -1486,7 +1507,7 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) { cfg.StartAt = "beginning" cfg.DeleteAfterRead = true emitCalls := make(chan *emitParams, longFileLines+1) - operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls)) + operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls), withReaderChan()) operator.persister = testutil.NewMockPersister("test") shortFile := openTemp(t, tempDir) @@ -1526,6 +1547,8 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) { // Stop consuming before long file has been fully consumed cancel() + operator.cancel() + operator.workerWg.Wait() wg.Wait() // short file was fully consumed and should have been deleted @@ -1609,6 +1632,9 @@ func TestHeaderPersistanceInHeader(t *testing.T) { // one poll operation occurs between now and when we stop. op1.poll(context.Background()) + // for threadpool, as the poll is asynchronous, allow it to complete one poll cycle + time.Sleep(500 * time.Millisecond) + require.NoError(t, op1.Stop()) writeString(t, temp, "|headerField2: headerValue2\nlog line\n") @@ -1636,7 +1662,7 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.FingerprintSize = 18 cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) operator.persister = testutil.NewMockPersister("test") // Both of they will be include diff --git a/pkg/stanza/fileconsumer/file_threadpool.go b/pkg/stanza/fileconsumer/file_threadpool.go new file mode 100755 index 000000000000..ba65156a1400 --- /dev/null +++ b/pkg/stanza/fileconsumer/file_threadpool.go @@ -0,0 +1,203 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" + +import ( + "context" + "os" + "sync" + + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" +) + +type readerWrapper struct { + reader *reader + fp *fingerprint.Fingerprint + path string +} + +func (m *Manager) kickoffThreads(ctx context.Context) { + m.readerChan = make(chan readerWrapper, m.maxBatchFiles*2) + for i := 0; i < m.maxBatchFiles; i++ { + m.workerWg.Add(1) + go m.worker(ctx) + } +} + +func (m *Manager) shutdownThreads() { + if m.readerChan != nil { + close(m.readerChan) + } + m.workerWg.Wait() + // save off any files left + // As we already cancelled our current context, create a new one to save any left offsets + // This is only applicable for `filelog.useThreadPool` featuregate + ctx, cancel := context.WithCancel(context.Background()) + m.syncLastPollFilesConcurrent(ctx) + cancel() +} + +// poll checks all the watched paths for new entries +func (m *Manager) pollConcurrent(ctx context.Context) { + // Increment the generation on all known readers + // This is done here because the next generation is about to start + m.knownFilesLock.Lock() + for i := 0; i < len(m.knownFiles); i++ { + m.knownFiles[i].generation++ + } + m.knownFilesLock.Unlock() + + // Get the list of paths on disk + matches, err := m.fileMatcher.MatchFiles() + if err != nil { + m.Errorf("error finding files: %s", err) + } + m.consumeConcurrent(ctx, matches) + m.clearCurrentFingerprints() + + // Any new files that appear should be consumed entirely + m.readerFactory.fromBeginning = true + m.syncLastPollFilesConcurrent(ctx) +} + +func (m *Manager) worker(ctx context.Context) { + defer m.workerWg.Done() + for { + select { + case <-ctx.Done(): + return + case chanData, ok := <-m.readerChan: + if !ok { + return + } + r, fp := chanData.reader, chanData.fp + if !m.readToEnd(ctx, r) { + // Save off any files that were not fully read or if deleteAfterRead is disabled + m.knownFilesLock.Lock() + m.knownFiles = append(m.knownFiles, r) + m.knownFilesLock.Unlock() + } + m.removePath(fp) + } + + } +} + +func (m *Manager) makeReaderConcurrent(filePath string) (*reader, *fingerprint.Fingerprint) { + fp, file := m.makeFingerprint(filePath) + if fp == nil { + return nil, nil + } + + // check if the current file is already being consumed + if m.isCurrentlyConsuming(fp) { + if err := file.Close(); err != nil { + m.Errorf("problem closing file", "file", file.Name()) + } + return nil, nil + } + + // Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files + if m.checkDuplicates(fp) { + if err := file.Close(); err != nil { + m.Errorf("problem closing file", "file", file.Name()) + } + return nil, nil + } + m.currentFps = append(m.currentFps, fp) + + reader, err := m.newReaderConcurrent(file, fp) + if err != nil { + m.Errorw("Failed to create reader", zap.Error(err)) + return nil, nil + } + return reader, fp +} + +func (m *Manager) consumeConcurrent(ctx context.Context, paths []string) { + m.clearOldReadersConcurrent(ctx) + for _, path := range paths { + reader, fp := m.makeReaderConcurrent(path) + if reader != nil { + // add path and fingerprint as it's not consuming + m.trieLock.Lock() + m.trie.Put(fp.FirstBytes) + m.trieLock.Unlock() + m.readerChan <- readerWrapper{reader: reader, fp: fp, path: path} + } + } +} + +func (m *Manager) isCurrentlyConsuming(fp *fingerprint.Fingerprint) bool { + m.trieLock.RLock() + defer m.trieLock.RUnlock() + return m.trie.HasKey(fp.FirstBytes) +} + +func (m *Manager) removePath(fp *fingerprint.Fingerprint) { + m.trieLock.Lock() + defer m.trieLock.Unlock() + m.trie.Delete(fp.FirstBytes) +} + +func (m *Manager) clearOldReadersConcurrent(ctx context.Context) { + m.knownFilesLock.Lock() + defer m.knownFilesLock.Unlock() + // Clear out old readers. They are sorted such that they are oldest first, + // so we can just find the first reader whose poll cycle is less than our + // limit i.e. last 3 cycles, and keep every reader after that + oldReaders := make([]*reader, 0) + for i := 0; i < len(m.knownFiles); i++ { + reader := m.knownFiles[i] + if reader.generation < 3 { + oldReaders = m.knownFiles[:i] + m.knownFiles = m.knownFiles[i:] + break + } + } + + if len(m.knownFiles) > 0 && m.knownFiles[len(m.knownFiles)-1].generation >= 3 { + oldReaders = m.knownFiles[:len(m.knownFiles)] + m.knownFiles = m.knownFiles[len(m.knownFiles):] + } + + var lostWG sync.WaitGroup + for _, r := range oldReaders { + lostWG.Add(1) + go func(r *reader) { + defer lostWG.Done() + r.ReadToEnd(ctx) + r.Close() + }(r) + } + lostWG.Wait() +} + +func (m *Manager) newReaderConcurrent(file *os.File, fp *fingerprint.Fingerprint) (*reader, error) { + // Check if the new path has the same fingerprint as an old path + if oldReader, ok := m.findFingerprintMatchConcurrent(fp); ok { + return m.readerFactory.copy(oldReader, file) + } + + // If we don't match any previously known files, create a new reader from scratch + return m.readerFactory.newReader(file, fp.Copy()) +} + +func (m *Manager) findFingerprintMatchConcurrent(fp *fingerprint.Fingerprint) (*reader, bool) { + // Iterate backwards to match newest first + m.knownFilesLock.Lock() + defer m.knownFilesLock.Unlock() + + return m.findFingerprintMatch(fp) +} + +// syncLastPollFiles syncs the most recent set of files to the database +func (m *Manager) syncLastPollFilesConcurrent(ctx context.Context) { + m.knownFilesLock.RLock() + defer m.knownFilesLock.RUnlock() + + m.syncLastPollFiles(ctx) +} diff --git a/pkg/stanza/fileconsumer/rotation_test.go b/pkg/stanza/fileconsumer/rotation_test.go index 5c02fddd86e4..897596293e3a 100644 --- a/pkg/stanza/fileconsumer/rotation_test.go +++ b/pkg/stanza/fileconsumer/rotation_test.go @@ -357,7 +357,7 @@ func TestMoveFile(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) operator.persister = testutil.NewMockPersister("test") temp1 := openTemp(t, tempDir) @@ -389,7 +389,7 @@ func TestTrackMovedAwayFiles(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) operator.persister = testutil.NewMockPersister("test") temp1 := openTemp(t, tempDir) @@ -418,6 +418,8 @@ func TestTrackMovedAwayFiles(t *testing.T) { require.NoError(t, err) writeString(t, movedFile, "testlog2\n") operator.poll(context.Background()) + operator.poll(context.Background()) + operator.poll(context.Background()) waitForToken(t, emitCalls, []byte("testlog2")) } @@ -474,7 +476,7 @@ func TestTruncateThenWrite(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) operator.persister = testutil.NewMockPersister("test") temp1 := openTemp(t, tempDir) @@ -511,7 +513,7 @@ func TestCopyTruncateWriteBoth(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) operator.persister = testutil.NewMockPersister("test") temp1 := openTemp(t, tempDir) diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index d3dc84e983d3..5408ebfabb94 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -73,7 +73,8 @@ type emitParams struct { } type testManagerConfig struct { - emitChan chan *emitParams + emitChan chan *emitParams + initializeChannel bool } type testManagerOption func(*testManagerConfig) @@ -84,13 +85,28 @@ func withEmitChan(emitChan chan *emitParams) testManagerOption { } } -func buildTestManager(t *testing.T, cfg *Config, opts ...testManagerOption) (*Manager, chan *emitParams) { +func withReaderChan() testManagerOption { + return func(c *testManagerConfig) { + c.initializeChannel = true + } +} + +func buildTestManager(t testing.TB, cfg *Config, opts ...testManagerOption) (*Manager, chan *emitParams) { tmc := &testManagerConfig{emitChan: make(chan *emitParams, 100)} for _, opt := range opts { opt(tmc) } input, err := cfg.Build(testutil.Logger(t), testEmitFunc(tmc.emitChan)) require.NoError(t, err) + if tmc.initializeChannel && useThreadPool.IsEnabled() { + input.readerChan = make(chan readerWrapper, cfg.MaxConcurrentFiles) + ctx, cancel := context.WithCancel(context.Background()) + input.cancel = cancel + for i := 0; i < cfg.MaxConcurrentFiles/2; i++ { + input.workerWg.Add(1) + go input.worker(ctx) + } + } return input, tmc.emitChan } diff --git a/receiver/filelogreceiver/filelog_test.go b/receiver/filelogreceiver/filelog_test.go index 4a8d5ef4af4d..129d52b1468e 100644 --- a/receiver/filelogreceiver/filelog_test.go +++ b/receiver/filelogreceiver/filelog_test.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" @@ -127,28 +128,56 @@ func TestReadRotatingFiles(t *testing.T) { tests := []rotationTest{ { - name: "CopyTruncateTimestamped", - copyTruncate: true, - sequential: false, + name: "CopyTruncateTimestamped", + copyTruncate: true, + sequential: false, + enableThreadPool: false, }, { - name: "CopyTruncateSequential", - copyTruncate: true, - sequential: true, + name: "CopyTruncateSequential", + copyTruncate: true, + sequential: true, + enableThreadPool: false, + }, + { + name: "CopyTruncateTimestampedThreadPool", + copyTruncate: true, + sequential: false, + enableThreadPool: true, + }, + { + name: "CopyTruncateSequentialThreadPool", + copyTruncate: true, + sequential: true, + enableThreadPool: true, }, } if runtime.GOOS != "windows" { // Windows has very poor support for moving active files, so rotation is less commonly used tests = append(tests, []rotationTest{ { - name: "MoveCreateTimestamped", - copyTruncate: false, - sequential: false, + name: "MoveCreateTimestampedThreadPool", + copyTruncate: false, + sequential: false, + enableThreadPool: true, }, { - name: "MoveCreateSequential", - copyTruncate: false, - sequential: true, + name: "MoveCreateSequentialThreadPool", + copyTruncate: false, + sequential: true, + enableThreadPool: true, + }, + { + name: "MoveCreateTimestamped", + copyTruncate: false, + sequential: false, + enableThreadPool: false, + }, + { + name: "MoveCreateSequential", + copyTruncate: false, + sequential: true, + enableThreadPool: false, }, }...) } @@ -159,9 +188,10 @@ func TestReadRotatingFiles(t *testing.T) { } type rotationTest struct { - name string - copyTruncate bool - sequential bool + name string + copyTruncate bool + sequential bool + enableThreadPool bool } func (rt *rotationTest) Run(t *testing.T) { @@ -169,6 +199,12 @@ func (rt *rotationTest) Run(t *testing.T) { tempDir := t.TempDir() + if rt.enableThreadPool { + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set("filelog.useThreadPool", false)) + }) + require.NoError(t, featuregate.GlobalRegistry().Set("filelog.useThreadPool", true)) + } f := NewFactory() sink := new(consumertest.LogsSink) From b9911371b6ef54a6d324715dc18a257e68297e8c Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Fri, 18 Aug 2023 18:00:05 +0530 Subject: [PATCH 02/19] fix: remove unnecessary --- pkg/stanza/fileconsumer/benchmark_test.go | 69 ----------------------- 1 file changed, 69 deletions(-) diff --git a/pkg/stanza/fileconsumer/benchmark_test.go b/pkg/stanza/fileconsumer/benchmark_test.go index 7405de8be5a6..42315e78bbb9 100644 --- a/pkg/stanza/fileconsumer/benchmark_test.go +++ b/pkg/stanza/fileconsumer/benchmark_test.go @@ -4,13 +4,9 @@ package fileconsumer import ( - "context" - "fmt" "os" "path/filepath" - "sync" "testing" - "time" "github.com/stretchr/testify/require" @@ -184,68 +180,3 @@ func BenchmarkFileInput(b *testing.B) { }) } } - -func BenchmarkLogsThroughput(b *testing.B) { - getMessage := func(f, m int) string { return fmt.Sprintf("file %d, message %d\n", f, m) } - rootDir := b.TempDir() - file := openFile(b, filepath.Join(rootDir, "file0.log")) - file1 := openFile(b, filepath.Join(rootDir, "file1.log")) - cfg := NewConfig().includeDir(rootDir) - cfg.StartAt = "beginning" - cfg.MaxConcurrentFiles = 8 - emitCalls := make(chan *emitParams, b.N*5) - operator, _ := buildTestManager(b, cfg, withEmitChan(emitCalls), withReaderChan()) - operator.persister = testutil.NewMockPersister("test") - - total := b.N * 100 - factor := 2000 - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - totalLogs := 2*total + 2*(total/factor) - for i := 0; i < totalLogs; i++ { - <-emitCalls - // c := <-emitCalls - } - // fmt.Println(totalLogs) - }() - b.ResetTimer() - - // write more logs in one file - for i := 0; i < total; i++ { - writeString(b, file, getMessage(0, i)) - } - // write less logs in one file - for i := 0; i < total/factor; i++ { - writeString(b, file1, getMessage(1, i)) - } - - start := time.Now() - - if useThreadPool.IsEnabled() { - operator.pollConcurrent(context.Background()) - } else { - operator.poll(context.Background()) - } - // // create different files for second poll - file = openFile(b, filepath.Join(rootDir, "file2.log")) - file1 = openFile(b, filepath.Join(rootDir, "file3.log")) - // // write more logs in one file - for i := 0; i < total; i++ { - writeString(b, file, getMessage(2, i)) - } - // write less logs in one file - for i := 0; i < total/factor; i++ { - writeString(b, file1, getMessage(3, i)) - } - // start2 := time.Now() - if useThreadPool.IsEnabled() { - operator.pollConcurrent(context.Background()) - } else { - operator.poll(context.Background()) - } - wg.Wait() - fmt.Println(time.Now().Sub(start), b.N) -} From 4e5dead335a5b7a5bbeabb58a5e3ed239c273e52 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Fri, 18 Aug 2023 18:03:20 +0530 Subject: [PATCH 03/19] fix: error --- pkg/stanza/fileconsumer/file.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index d15021fef3de..a002f2f3e64e 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -75,7 +75,7 @@ func (m *Manager) Start(persister operator.Persister) error { // If useThreadPool is enabled, kick off the worker threads if useThreadPool.IsEnabled() { - m.kickOffThreads(ctx) + m.kickoffThreads(ctx) } // Start polling goroutine From 1e3c11e704b20a73deb80fc87d13e29eda44f9cf Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Fri, 18 Aug 2023 18:09:31 +0530 Subject: [PATCH 04/19] chore: add changelog --- .chloggen/add-threadpool-featuregate.yaml | 27 ++++++++++++++++++++++ pkg/stanza/fileconsumer/file.go | 3 ++- pkg/stanza/fileconsumer/file_threadpool.go | 3 +-- 3 files changed, 30 insertions(+), 3 deletions(-) create mode 100644 .chloggen/add-threadpool-featuregate.yaml diff --git a/.chloggen/add-threadpool-featuregate.yaml b/.chloggen/add-threadpool-featuregate.yaml new file mode 100644 index 000000000000..e8143af4c543 --- /dev/null +++ b/.chloggen/add-threadpool-featuregate.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: fileconsumer + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Added a new feature gate that enables a thread pool mechanism to respect the poll_interval parameter. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [18908] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index a002f2f3e64e..feea91cee3fd 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -49,7 +49,6 @@ type Manager struct { // Following fields are used only when useThreadPool is enabled workerWg sync.WaitGroup - _workerWg sync.WaitGroup knownFilesLock sync.RWMutex readerChan chan readerWrapper @@ -116,11 +115,13 @@ func (m *Manager) startPoller(ctx context.Context) { return case <-globTicker.C: } + m.poll(ctx) } }() } +// poll checks all the watched paths for new entries func (m *Manager) poll(ctx context.Context) { if useThreadPool.IsEnabled() { m.pollConcurrent(ctx) diff --git a/pkg/stanza/fileconsumer/file_threadpool.go b/pkg/stanza/fileconsumer/file_threadpool.go index ba65156a1400..b444fadf951a 100755 --- a/pkg/stanza/fileconsumer/file_threadpool.go +++ b/pkg/stanza/fileconsumer/file_threadpool.go @@ -16,7 +16,6 @@ import ( type readerWrapper struct { reader *reader fp *fingerprint.Fingerprint - path string } func (m *Manager) kickoffThreads(ctx context.Context) { @@ -126,7 +125,7 @@ func (m *Manager) consumeConcurrent(ctx context.Context, paths []string) { m.trieLock.Lock() m.trie.Put(fp.FirstBytes) m.trieLock.Unlock() - m.readerChan <- readerWrapper{reader: reader, fp: fp, path: path} + m.readerChan <- readerWrapper{reader: reader, fp: fp} } } } From 93dc34a455e123448d18e40ef548d3e25dc91ebb Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Fri, 18 Aug 2023 18:27:03 +0530 Subject: [PATCH 05/19] fix: refactor --- pkg/stanza/fileconsumer/file_threadpool.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/pkg/stanza/fileconsumer/file_threadpool.go b/pkg/stanza/fileconsumer/file_threadpool.go index b444fadf951a..a20e3a941772 100755 --- a/pkg/stanza/fileconsumer/file_threadpool.go +++ b/pkg/stanza/fileconsumer/file_threadpool.go @@ -149,19 +149,17 @@ func (m *Manager) clearOldReadersConcurrent(ctx context.Context) { // so we can just find the first reader whose poll cycle is less than our // limit i.e. last 3 cycles, and keep every reader after that oldReaders := make([]*reader, 0) - for i := 0; i < len(m.knownFiles); i++ { + i := 0 + for ; i < len(m.knownFiles); i++ { reader := m.knownFiles[i] - if reader.generation < 3 { - oldReaders = m.knownFiles[:i] - m.knownFiles = m.knownFiles[i:] + if reader.generation >= 3 { + oldReaders = append(oldReaders, reader) + i += 1 + } else { break } } - - if len(m.knownFiles) > 0 && m.knownFiles[len(m.knownFiles)-1].generation >= 3 { - oldReaders = m.knownFiles[:len(m.knownFiles)] - m.knownFiles = m.knownFiles[len(m.knownFiles):] - } + m.knownFiles = m.knownFiles[i:] var lostWG sync.WaitGroup for _, r := range oldReaders { From 2baec2e1f5eff05515e8f22f63e2b3993a809f46 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Fri, 18 Aug 2023 18:36:22 +0530 Subject: [PATCH 06/19] fix: test case failure --- pkg/stanza/fileconsumer/file_threadpool.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/stanza/fileconsumer/file_threadpool.go b/pkg/stanza/fileconsumer/file_threadpool.go index a20e3a941772..26b7308b1364 100755 --- a/pkg/stanza/fileconsumer/file_threadpool.go +++ b/pkg/stanza/fileconsumer/file_threadpool.go @@ -154,7 +154,6 @@ func (m *Manager) clearOldReadersConcurrent(ctx context.Context) { reader := m.knownFiles[i] if reader.generation >= 3 { oldReaders = append(oldReaders, reader) - i += 1 } else { break } From c364493025ce13c98b1ac64300603c5ce9615854 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Mon, 21 Aug 2023 17:35:17 +0530 Subject: [PATCH 07/19] fix spacing --- pkg/stanza/fileconsumer/file.go | 1 - pkg/stanza/fileconsumer/file_threadpool.go | 1 - 2 files changed, 2 deletions(-) diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index feea91cee3fd..475f5086fec7 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -115,7 +115,6 @@ func (m *Manager) startPoller(ctx context.Context) { return case <-globTicker.C: } - m.poll(ctx) } }() diff --git a/pkg/stanza/fileconsumer/file_threadpool.go b/pkg/stanza/fileconsumer/file_threadpool.go index 26b7308b1364..a47b4a74df87 100755 --- a/pkg/stanza/fileconsumer/file_threadpool.go +++ b/pkg/stanza/fileconsumer/file_threadpool.go @@ -81,7 +81,6 @@ func (m *Manager) worker(ctx context.Context) { } m.removePath(fp) } - } } From 879493cebbfd76a6e59f5d0b02814cf507ac3904 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Fri, 1 Sep 2023 12:51:00 +0530 Subject: [PATCH 08/19] fix: make clearOldReaders non-blocking --- pkg/stanza/fileconsumer/file_threadpool.go | 47 ++++++++++++---------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/pkg/stanza/fileconsumer/file_threadpool.go b/pkg/stanza/fileconsumer/file_threadpool.go index a47b4a74df87..8cc9bfd02870 100755 --- a/pkg/stanza/fileconsumer/file_threadpool.go +++ b/pkg/stanza/fileconsumer/file_threadpool.go @@ -6,7 +6,6 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collecto import ( "context" "os" - "sync" "go.uber.org/zap" @@ -14,8 +13,9 @@ import ( ) type readerWrapper struct { - reader *reader - fp *fingerprint.Fingerprint + reader *reader + trieKey *fingerprint.Fingerprint + close bool // indicate if we should close the file after reading, used when we detect lost readers } func (m *Manager) kickoffThreads(ctx context.Context) { @@ -68,18 +68,21 @@ func (m *Manager) worker(ctx context.Context) { select { case <-ctx.Done(): return - case chanData, ok := <-m.readerChan: + case wrapper, ok := <-m.readerChan: if !ok { return } - r, fp := chanData.reader, chanData.fp - if !m.readToEnd(ctx, r) { - // Save off any files that were not fully read or if deleteAfterRead is disabled + r, fp := wrapper.reader, wrapper.trieKey + if !m.readToEnd(ctx, r) && !wrapper.close { + // Save off any files that were not fully read or if deleteAfterRead is disabled. m.knownFilesLock.Lock() m.knownFiles = append(m.knownFiles, r) m.knownFilesLock.Unlock() + } else if wrapper.close { + // this is a lost reader, close it and release the file descriptor + r.Close() } - m.removePath(fp) + m.updateTrie(fp, false) } } } @@ -120,11 +123,9 @@ func (m *Manager) consumeConcurrent(ctx context.Context, paths []string) { for _, path := range paths { reader, fp := m.makeReaderConcurrent(path) if reader != nil { - // add path and fingerprint as it's not consuming - m.trieLock.Lock() - m.trie.Put(fp.FirstBytes) - m.trieLock.Unlock() - m.readerChan <- readerWrapper{reader: reader, fp: fp} + // add fingerprint to trie + m.updateTrie(fp, true) + m.readerChan <- readerWrapper{reader: reader, trieKey: fp} } } } @@ -135,10 +136,14 @@ func (m *Manager) isCurrentlyConsuming(fp *fingerprint.Fingerprint) bool { return m.trie.HasKey(fp.FirstBytes) } -func (m *Manager) removePath(fp *fingerprint.Fingerprint) { +func (m *Manager) updateTrie(fp *fingerprint.Fingerprint, insert bool) { m.trieLock.Lock() defer m.trieLock.Unlock() - m.trie.Delete(fp.FirstBytes) + if insert { + m.trie.Put(fp.FirstBytes) + } else { + m.trie.Delete(fp.FirstBytes) + } } func (m *Manager) clearOldReadersConcurrent(ctx context.Context) { @@ -159,16 +164,14 @@ func (m *Manager) clearOldReadersConcurrent(ctx context.Context) { } m.knownFiles = m.knownFiles[i:] - var lostWG sync.WaitGroup for _, r := range oldReaders { - lostWG.Add(1) - go func(r *reader) { - defer lostWG.Done() - r.ReadToEnd(ctx) + if m.isCurrentlyConsuming(r.Fingerprint) { r.Close() - }(r) + } else { + m.updateTrie(r.Fingerprint, true) + m.readerChan <- readerWrapper{reader: r, trieKey: r.Fingerprint, close: true} + } } - lostWG.Wait() } func (m *Manager) newReaderConcurrent(file *os.File, fp *fingerprint.Fingerprint) (*reader, error) { From bf39c05cd0e23b598c54c83b19f80d78c7aaf4e9 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Mon, 4 Sep 2023 19:06:16 +0530 Subject: [PATCH 09/19] add benchmark --- pkg/stanza/fileconsumer/benchmark_test.go | 72 +++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/pkg/stanza/fileconsumer/benchmark_test.go b/pkg/stanza/fileconsumer/benchmark_test.go index 42315e78bbb9..686bfbf01e9a 100644 --- a/pkg/stanza/fileconsumer/benchmark_test.go +++ b/pkg/stanza/fileconsumer/benchmark_test.go @@ -4,10 +4,15 @@ package fileconsumer import ( + "context" + "fmt" "os" "path/filepath" + "strings" + "sync" "testing" + "github.com/google/uuid" "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" @@ -20,6 +25,10 @@ type fileInputBenchmark struct { config func() *Config } +type fileSizeBenchmark struct { + sizes [2]int +} + type benchFile struct { *os.File log func(int) @@ -180,3 +189,66 @@ func BenchmarkFileInput(b *testing.B) { }) } } + +func max(x, y int) int { + if x < y { + return y + } + return x +} + +func (fileSize fileSizeBenchmark) createFiles(b *testing.B, rootDir string) { + // create 50 files, some with large file sizes, other's with rather small + getMessage := func(m int) string { return fmt.Sprintf("message %d", m) } + logs := make([]string, 0) + for i := 0; i < max(fileSize.sizes[0], fileSize.sizes[1]); i++ { + logs = append(logs, getMessage(i)) + } + + for i := 0; i < 50; i++ { + file := openFile(b, filepath.Join(rootDir, fmt.Sprintf("file_%s.log", uuid.NewString()))) + file.WriteString(uuid.NewString() + strings.Join(logs[:fileSize.sizes[i%2]], "\n") + "\n") + } +} + +func BenchmarkFileSizeVarying(b *testing.B) { + fileSize := fileSizeBenchmark{ + sizes: [2]int{b.N * 5000, b.N * 10}, + } + rootDir := b.TempDir() + cfg := NewConfig().includeDir(rootDir) + cfg.StartAt = "beginning" + cfg.MaxConcurrentFiles = 50 + totalLogs := fileSize.sizes[0]*50 + fileSize.sizes[1]*50 + emitCalls := make(chan *emitParams, totalLogs+10) + + operator, _ := buildTestManager(b, cfg, withEmitChan(emitCalls), withReaderChan()) + operator.persister = testutil.NewMockPersister("test") + defer func() { + require.NoError(b, operator.Stop()) + }() + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + var once sync.Once + for i := 0; i < totalLogs; i++ { + once.Do(func() { + // Reset once we get the first log + b.ResetTimer() + }) + <-emitCalls + } + // Stop the timer, as we're measuring log throughput + b.StopTimer() + }() + // create first set of files + fileSize.createFiles(b, rootDir) + operator.poll(context.Background()) + + // create new set of files, call poll() again + fileSize.createFiles(b, rootDir) + operator.poll(context.Background()) + + wg.Wait() +} From 7bd826c82c377491b07a1bfa24e54d930b1f72ff Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Mon, 4 Sep 2023 19:12:36 +0530 Subject: [PATCH 10/19] add uuid --- pkg/stanza/go.mod | 1 + pkg/stanza/go.sum | 2 ++ 2 files changed, 3 insertions(+) diff --git a/pkg/stanza/go.mod b/pkg/stanza/go.mod index 8fc43edb037b..4bad33ccb3ce 100644 --- a/pkg/stanza/go.mod +++ b/pkg/stanza/go.mod @@ -6,6 +6,7 @@ require ( github.com/antonmedv/expr v1.13.0 github.com/bmatcuk/doublestar/v4 v4.6.0 github.com/cespare/xxhash/v2 v2.2.0 + github.com/google/uuid v1.3.0 github.com/influxdata/go-syslog/v3 v3.0.1-0.20210608084020-ac565dc76ba6 github.com/jpillora/backoff v1.0.0 github.com/json-iterator/go v1.1.12 diff --git a/pkg/stanza/go.sum b/pkg/stanza/go.sum index edd352a4a714..c881104942fe 100644 --- a/pkg/stanza/go.sum +++ b/pkg/stanza/go.sum @@ -112,6 +112,8 @@ github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8 github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/consul/api v1.13.0/go.mod h1:ZlVrynguJKcYr54zGaDbaL3fOvKC9m72FhPvA8T35KQ= From 28979bbc19c1ed0a89f8205d5fe197a81b0cbb69 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Mon, 4 Sep 2023 19:23:48 +0530 Subject: [PATCH 11/19] add comments --- pkg/stanza/fileconsumer/benchmark_test.go | 2 +- pkg/stanza/fileconsumer/file_threadpool.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/stanza/fileconsumer/benchmark_test.go b/pkg/stanza/fileconsumer/benchmark_test.go index 686bfbf01e9a..eecfe5bc1bb3 100644 --- a/pkg/stanza/fileconsumer/benchmark_test.go +++ b/pkg/stanza/fileconsumer/benchmark_test.go @@ -213,7 +213,7 @@ func (fileSize fileSizeBenchmark) createFiles(b *testing.B, rootDir string) { func BenchmarkFileSizeVarying(b *testing.B) { fileSize := fileSizeBenchmark{ - sizes: [2]int{b.N * 5000, b.N * 10}, + sizes: [2]int{b.N * 5000, b.N * 10}, // Half the files will be huge, other half will be smaller } rootDir := b.TempDir() cfg := NewConfig().includeDir(rootDir) diff --git a/pkg/stanza/fileconsumer/file_threadpool.go b/pkg/stanza/fileconsumer/file_threadpool.go index 8cc9bfd02870..cdcbdf3481be 100755 --- a/pkg/stanza/fileconsumer/file_threadpool.go +++ b/pkg/stanza/fileconsumer/file_threadpool.go @@ -15,7 +15,7 @@ import ( type readerWrapper struct { reader *reader trieKey *fingerprint.Fingerprint - close bool // indicate if we should close the file after reading, used when we detect lost readers + close bool // indicate if we should close the file after reading. Used when we detect lost readers } func (m *Manager) kickoffThreads(ctx context.Context) { From 89609a64bf6d90150e4078d3025e598c1410a924 Mon Sep 17 00:00:00 2001 From: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Date: Thu, 7 Sep 2023 20:08:57 +0530 Subject: [PATCH 12/19] Update pkg/stanza/fileconsumer/file_threadpool.go Co-authored-by: Daniel Jaglowski --- pkg/stanza/fileconsumer/file_threadpool.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/stanza/fileconsumer/file_threadpool.go b/pkg/stanza/fileconsumer/file_threadpool.go index cdcbdf3481be..dc0cf9800d59 100755 --- a/pkg/stanza/fileconsumer/file_threadpool.go +++ b/pkg/stanza/fileconsumer/file_threadpool.go @@ -156,13 +156,11 @@ func (m *Manager) clearOldReadersConcurrent(ctx context.Context) { i := 0 for ; i < len(m.knownFiles); i++ { reader := m.knownFiles[i] - if reader.generation >= 3 { - oldReaders = append(oldReaders, reader) - } else { + if reader.generation < 3 { break } } - m.knownFiles = m.knownFiles[i:] + oldReaders, m.knownFiles := m.knownFiles[:i], m.knownFiles[i:] for _, r := range oldReaders { if m.isCurrentlyConsuming(r.Fingerprint) { From 4f24f25192711164e2c5b20bb630554c71a276f5 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Mon, 11 Sep 2023 14:33:51 +0530 Subject: [PATCH 13/19] fix: improve readability --- pkg/stanza/fileconsumer/config.go | 2 +- pkg/stanza/fileconsumer/file.go | 6 ++--- pkg/stanza/fileconsumer/file_threadpool.go | 30 ++++++++++++---------- pkg/stanza/fileconsumer/util_test.go | 2 +- pkg/stanza/go.mod | 2 +- pkg/stanza/go.sum | 4 +-- 6 files changed, 24 insertions(+), 22 deletions(-) diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 0fb9ec6a6e9b..f0ed8ed3f544 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -190,7 +190,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact seenPaths: make(map[string]struct{}, 100), } if useThreadPool.IsEnabled() { - manager.readerChan = make(chan readerWrapper, c.MaxConcurrentFiles) + manager.readerChan = make(chan readerEnvelope, c.MaxConcurrentFiles) manager.trie = trie.NewTrie() } return &manager, nil diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 5d41a611e483..d01595147f4d 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -48,11 +48,11 @@ type Manager struct { currentFps []*fingerprint.Fingerprint // Following fields are used only when useThreadPool is enabled - workerWg sync.WaitGroup knownFilesLock sync.RWMutex + trieLock sync.RWMutex - readerChan chan readerWrapper - trieLock sync.RWMutex + workerWg sync.WaitGroup + readerChan chan readerEnvelope // TRIE - this data structure stores the fingerprint of the files which are currently being consumed trie *trie.Trie diff --git a/pkg/stanza/fileconsumer/file_threadpool.go b/pkg/stanza/fileconsumer/file_threadpool.go index dc0cf9800d59..b778204745b4 100755 --- a/pkg/stanza/fileconsumer/file_threadpool.go +++ b/pkg/stanza/fileconsumer/file_threadpool.go @@ -12,14 +12,14 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" ) -type readerWrapper struct { +type readerEnvelope struct { reader *reader trieKey *fingerprint.Fingerprint close bool // indicate if we should close the file after reading. Used when we detect lost readers } func (m *Manager) kickoffThreads(ctx context.Context) { - m.readerChan = make(chan readerWrapper, m.maxBatchFiles*2) + m.readerChan = make(chan readerEnvelope, m.maxBatchFiles*2) for i := 0; i < m.maxBatchFiles; i++ { m.workerWg.Add(1) go m.worker(ctx) @@ -82,7 +82,7 @@ func (m *Manager) worker(ctx context.Context) { // this is a lost reader, close it and release the file descriptor r.Close() } - m.updateTrie(fp, false) + m.trieDelete(fp) } } } @@ -124,8 +124,8 @@ func (m *Manager) consumeConcurrent(ctx context.Context, paths []string) { reader, fp := m.makeReaderConcurrent(path) if reader != nil { // add fingerprint to trie - m.updateTrie(fp, true) - m.readerChan <- readerWrapper{reader: reader, trieKey: fp} + m.triePut(fp) + m.readerChan <- readerEnvelope{reader: reader, trieKey: fp} } } } @@ -136,14 +136,16 @@ func (m *Manager) isCurrentlyConsuming(fp *fingerprint.Fingerprint) bool { return m.trie.HasKey(fp.FirstBytes) } -func (m *Manager) updateTrie(fp *fingerprint.Fingerprint, insert bool) { +func (m *Manager) triePut(fp *fingerprint.Fingerprint) { m.trieLock.Lock() defer m.trieLock.Unlock() - if insert { - m.trie.Put(fp.FirstBytes) - } else { - m.trie.Delete(fp.FirstBytes) - } + m.trie.Put(fp.FirstBytes) +} + +func (m *Manager) trieDelete(fp *fingerprint.Fingerprint) { + m.trieLock.Lock() + defer m.trieLock.Unlock() + m.trie.Delete(fp.FirstBytes) } func (m *Manager) clearOldReadersConcurrent(ctx context.Context) { @@ -160,14 +162,14 @@ func (m *Manager) clearOldReadersConcurrent(ctx context.Context) { break } } - oldReaders, m.knownFiles := m.knownFiles[:i], m.knownFiles[i:] + oldReaders, m.knownFiles = m.knownFiles[:i], m.knownFiles[i:] for _, r := range oldReaders { if m.isCurrentlyConsuming(r.Fingerprint) { r.Close() } else { - m.updateTrie(r.Fingerprint, true) - m.readerChan <- readerWrapper{reader: r, trieKey: r.Fingerprint, close: true} + m.triePut(r.Fingerprint) + m.readerChan <- readerEnvelope{reader: r, trieKey: r.Fingerprint, close: true} } } } diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index 5408ebfabb94..c35203e3aecf 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -99,7 +99,7 @@ func buildTestManager(t testing.TB, cfg *Config, opts ...testManagerOption) (*Ma input, err := cfg.Build(testutil.Logger(t), testEmitFunc(tmc.emitChan)) require.NoError(t, err) if tmc.initializeChannel && useThreadPool.IsEnabled() { - input.readerChan = make(chan readerWrapper, cfg.MaxConcurrentFiles) + input.readerChan = make(chan readerEnvelope, cfg.MaxConcurrentFiles) ctx, cancel := context.WithCancel(context.Background()) input.cancel = cancel for i := 0; i < cfg.MaxConcurrentFiles/2; i++ { diff --git a/pkg/stanza/go.mod b/pkg/stanza/go.mod index 90123f5dd951..45baacef851f 100644 --- a/pkg/stanza/go.mod +++ b/pkg/stanza/go.mod @@ -6,7 +6,7 @@ require ( github.com/antonmedv/expr v1.14.3 github.com/bmatcuk/doublestar/v4 v4.6.0 github.com/cespare/xxhash/v2 v2.2.0 - github.com/google/uuid v1.3.0 + github.com/google/uuid v1.3.1 github.com/influxdata/go-syslog/v3 v3.0.1-0.20210608084020-ac565dc76ba6 github.com/jpillora/backoff v1.0.0 github.com/json-iterator/go v1.1.12 diff --git a/pkg/stanza/go.sum b/pkg/stanza/go.sum index 9f68814e7bca..32f79d9fbed6 100644 --- a/pkg/stanza/go.sum +++ b/pkg/stanza/go.sum @@ -112,8 +112,8 @@ github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8 github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/consul/api v1.13.0/go.mod h1:ZlVrynguJKcYr54zGaDbaL3fOvKC9m72FhPvA8T35KQ= From dbc025fed1c175d16d6578d16534f25a35a42498 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Wed, 13 Sep 2023 18:37:04 +0530 Subject: [PATCH 14/19] fix: make benchmark scalable --- pkg/stanza/fileconsumer/benchmark_test.go | 152 +++++++++++++++------- pkg/stanza/fileconsumer/file.go | 1 + 2 files changed, 103 insertions(+), 50 deletions(-) diff --git a/pkg/stanza/fileconsumer/benchmark_test.go b/pkg/stanza/fileconsumer/benchmark_test.go index eecfe5bc1bb3..f18281bf7b34 100644 --- a/pkg/stanza/fileconsumer/benchmark_test.go +++ b/pkg/stanza/fileconsumer/benchmark_test.go @@ -26,7 +26,10 @@ type fileInputBenchmark struct { } type fileSizeBenchmark struct { - sizes [2]int + logs []int + maxConcurrent int + name string + desiredFiles int } type benchFile struct { @@ -190,65 +193,114 @@ func BenchmarkFileInput(b *testing.B) { } } -func max(x, y int) int { - if x < y { - return y +func max(nums ...int) int { + _max := 0 + for _, n := range nums { + if _max < n { + _max = n + } } - return x + return _max } -func (fileSize fileSizeBenchmark) createFiles(b *testing.B, rootDir string) { - // create 50 files, some with large file sizes, other's with rather small +func (fileSize fileSizeBenchmark) createFiles(b *testing.B, rootDir string) int { + + // the number of logs written to a file is selected in round-robin fashion from fileSize.logs + // eg. fileSize.logs = [10,100] + // It will create one half of files with 10*b.N lines and other with 100*b.N lines + // ileSize.logs = [10,100,500] + // It will create one third of files with 10*b.N lines and other with 100*b.N and remaining third with 500*b.N + getMessage := func(m int) string { return fmt.Sprintf("message %d", m) } - logs := make([]string, 0) - for i := 0; i < max(fileSize.sizes[0], fileSize.sizes[1]); i++ { + logs := make([]string, 0, b.N) // collect all the logs at beginning itself to and reuse same to write to files + for i := 0; i < max(fileSize.logs...)*b.N; i++ { logs = append(logs, getMessage(i)) } - - for i := 0; i < 50; i++ { + totalLogs := 0 + for i := 0; i < fileSize.desiredFiles; i++ { file := openFile(b, filepath.Join(rootDir, fmt.Sprintf("file_%s.log", uuid.NewString()))) - file.WriteString(uuid.NewString() + strings.Join(logs[:fileSize.sizes[i%2]], "\n") + "\n") + // Use uuid.NewString() to introduce some randomness in file logs + // or else file consumer will detect a duplicate based on fingerprint + linesToWrite := b.N * fileSize.logs[i%len(fileSize.logs)] + file.WriteString(uuid.NewString() + strings.Join(logs[:linesToWrite], "\n") + "\n") + totalLogs += linesToWrite } + return totalLogs } func BenchmarkFileSizeVarying(b *testing.B) { - fileSize := fileSizeBenchmark{ - sizes: [2]int{b.N * 5000, b.N * 10}, // Half the files will be huge, other half will be smaller + testCases := []fileSizeBenchmark{ + { + name: "varying_sizes", + logs: []int{10, 1000}, + maxConcurrent: 50, + desiredFiles: 100, + }, + { + name: "varying_sizes_more_files", + logs: []int{10, 1000}, + maxConcurrent: 50, + desiredFiles: 200, + }, + { + name: "varying_sizes_more_files_throttled", + logs: []int{10, 1000}, + maxConcurrent: 30, + desiredFiles: 200, + }, + { + name: "same_size_small", + logs: []int{10}, + maxConcurrent: 50, + desiredFiles: 50, + }, + { + name: "same_size_small_throttled", + logs: []int{10}, + maxConcurrent: 10, + desiredFiles: 100, + }, + } + for _, fileSize := range testCases { + b.Run(fileSize.name, func(b *testing.B) { + rootDir := b.TempDir() + cfg := NewConfig().includeDir(rootDir) + cfg.StartAt = "beginning" + cfg.MaxConcurrentFiles = fileSize.maxConcurrent + emitCalls := make(chan *emitParams, max(fileSize.logs...)*b.N) // large enough to hold all the logs + + operator, _ := buildTestManager(b, cfg, withEmitChan(emitCalls), withReaderChan()) + operator.persister = testutil.NewMockPersister("test") + defer func() { + require.NoError(b, operator.Stop()) + }() + + // create first set of files + totalLogs := fileSize.createFiles(b, rootDir) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + var once sync.Once + // wait for logs from two poll() cycles + for i := 0; i < totalLogs*2; i++ { + once.Do(func() { + // Reset once we get the first log + b.ResetTimer() + }) + <-emitCalls + } + // Stop the timer, as we're measuring log throughput + b.StopTimer() + }() + operator.poll(context.Background()) + + // create new set of files, call poll() again + fileSize.createFiles(b, rootDir) + operator.poll(context.Background()) + + wg.Wait() + }) + } - rootDir := b.TempDir() - cfg := NewConfig().includeDir(rootDir) - cfg.StartAt = "beginning" - cfg.MaxConcurrentFiles = 50 - totalLogs := fileSize.sizes[0]*50 + fileSize.sizes[1]*50 - emitCalls := make(chan *emitParams, totalLogs+10) - - operator, _ := buildTestManager(b, cfg, withEmitChan(emitCalls), withReaderChan()) - operator.persister = testutil.NewMockPersister("test") - defer func() { - require.NoError(b, operator.Stop()) - }() - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - var once sync.Once - for i := 0; i < totalLogs; i++ { - once.Do(func() { - // Reset once we get the first log - b.ResetTimer() - }) - <-emitCalls - } - // Stop the timer, as we're measuring log throughput - b.StopTimer() - }() - // create first set of files - fileSize.createFiles(b, rootDir) - operator.poll(context.Background()) - - // create new set of files, call poll() again - fileSize.createFiles(b, rootDir) - operator.poll(context.Background()) - - wg.Wait() } diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 050dce070fa1..b0d6ad01a8f4 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -115,6 +115,7 @@ func (m *Manager) startPoller(ctx context.Context) { return case <-globTicker.C: } + m.poll(ctx) } }() From 89bb07a874e87e6e5eddd3b4ece077960c0e9e52 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Sat, 16 Sep 2023 05:29:06 +0530 Subject: [PATCH 15/19] fix: respect max_concurrent and fix copy truncate --- pkg/stanza/fileconsumer/config.go | 2 +- pkg/stanza/fileconsumer/file_test.go | 9 +- pkg/stanza/fileconsumer/file_threadpool.go | 97 ++++++++++++++++++---- pkg/stanza/fileconsumer/rotation_test.go | 2 - pkg/stanza/fileconsumer/util_test.go | 2 +- 5 files changed, 84 insertions(+), 28 deletions(-) diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 90400f25c146..d26c92f2e10e 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -196,7 +196,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact seenPaths: make(map[string]struct{}, 100), } if useThreadPool.IsEnabled() { - manager.readerChan = make(chan readerEnvelope, c.MaxConcurrentFiles) + manager.readerChan = make(chan readerEnvelope, c.MaxConcurrentFiles/2) manager.trie = trie.NewTrie() } return &manager, nil diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 59546fa8938f..59912a20d565 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -981,9 +981,6 @@ func TestManyLogsDelivered(t *testing.T) { func TestFileBatching(t *testing.T) { t.Parallel() - if useThreadPool.IsEnabled() { - t.Skip(`Skipping for thread pool feature gate, as there's no concept of batching for thread pool`) - } files := 100 linesPerFile := 10 @@ -1371,9 +1368,6 @@ func TestDeleteAfterRead(t *testing.T) { } func TestMaxBatching(t *testing.T) { - if useThreadPool.IsEnabled() { - t.Skip(`Skipping for thread pool feature gate, as there's no concept of batching for thread pool`) - } t.Parallel() files := 50 @@ -1551,6 +1545,9 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) { // Stop consuming before long file has been fully consumed cancel() operator.cancel() + if operator.readerChan != nil { + close(operator.readerChan) + } operator.workerWg.Wait() wg.Wait() diff --git a/pkg/stanza/fileconsumer/file_threadpool.go b/pkg/stanza/fileconsumer/file_threadpool.go index 195c19afd008..3f0c872e1539 100755 --- a/pkg/stanza/fileconsumer/file_threadpool.go +++ b/pkg/stanza/fileconsumer/file_threadpool.go @@ -5,7 +5,9 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collecto import ( "context" + "io" "os" + "sync" "go.uber.org/zap" @@ -15,11 +17,10 @@ import ( type readerEnvelope struct { reader *reader trieKey *fingerprint.Fingerprint - close bool // indicate if we should close the file after reading. Used when we detect lost readers } func (m *Manager) kickoffThreads(ctx context.Context) { - m.readerChan = make(chan readerEnvelope, m.maxBatchFiles*2) + m.readerChan = make(chan readerEnvelope, m.maxBatchFiles) for i := 0; i < m.maxBatchFiles; i++ { m.workerWg.Add(1) go m.worker(ctx) @@ -48,14 +49,25 @@ func (m *Manager) pollConcurrent(ctx context.Context) { m.knownFiles[i].generation++ } m.knownFilesLock.Unlock() - + batchesProcessed := 0 // Get the list of paths on disk matches, err := m.fileMatcher.MatchFiles() if err != nil { m.Errorf("error finding files: %s", err) } + for len(matches) > m.maxBatchFiles { + m.consumeConcurrent(ctx, matches[:m.maxBatchFiles]) + + if m.maxBatches != 0 { + batchesProcessed++ + if batchesProcessed >= m.maxBatches { + return + } + } + + matches = matches[m.maxBatchFiles:] + } m.consumeConcurrent(ctx, matches) - m.clearCurrentFingerprints() // Any new files that appear should be consumed entirely m.readerFactory.fromBeginning = true @@ -66,8 +78,6 @@ func (m *Manager) worker(ctx context.Context) { defer m.workerWg.Done() for { select { - case <-ctx.Done(): - return case wrapper, ok := <-m.readerChan: if !ok { return @@ -76,14 +86,11 @@ func (m *Manager) worker(ctx context.Context) { r.ReadToEnd(ctx) if m.deleteAfterRead && r.eof { r.Delete() - } else if !wrapper.close { + } else { // Save off any files that were not fully read. m.knownFilesLock.Lock() m.knownFiles = append(m.knownFiles, r) m.knownFilesLock.Unlock() - } else { - // this is a lost reader, close it and release the file descriptor - r.Close() } m.trieDelete(fp) } @@ -122,6 +129,7 @@ func (m *Manager) makeReaderConcurrent(filePath string) (*reader, *fingerprint.F } func (m *Manager) consumeConcurrent(ctx context.Context, paths []string) { + m.Debug("Consuming files") m.clearOldReadersConcurrent(ctx) for _, path := range paths { reader, fp := m.makeReaderConcurrent(path) @@ -131,6 +139,7 @@ func (m *Manager) consumeConcurrent(ctx context.Context, paths []string) { m.readerChan <- readerEnvelope{reader: reader, trieKey: fp} } } + m.clearCurrentFingerprints() } func (m *Manager) isCurrentlyConsuming(fp *fingerprint.Fingerprint) bool { @@ -157,24 +166,34 @@ func (m *Manager) clearOldReadersConcurrent(ctx context.Context) { // Clear out old readers. They are sorted such that they are oldest first, // so we can just find the first reader whose poll cycle is less than our // limit i.e. last 3 cycles, and keep every reader after that - oldReaders := make([]*reader, 0) i := 0 for ; i < len(m.knownFiles); i++ { reader := m.knownFiles[i] - if reader.generation < 3 { + if reader.generation <= 3 { break } } - oldReaders, m.knownFiles = m.knownFiles[:i], m.knownFiles[i:] - - for _, r := range oldReaders { - if m.isCurrentlyConsuming(r.Fingerprint) { + m.knownFiles = m.knownFiles[i:] + var wg sync.WaitGroup + for _, r := range m.knownFiles { + if r.file == nil { + // already closed + continue + } + if m.checkTruncate(r) { + // if it's an updated version, we don't to readToEnd, it will cause duplicates. + // current poll() will take care of reading r.Close() } else { - m.triePut(r.Fingerprint) - m.readerChan <- readerEnvelope{reader: r, trieKey: r.Fingerprint, close: true} + wg.Add(1) + go func(r *reader) { + defer wg.Done() + r.ReadToEnd(ctx) + r.Close() + }(r) } } + wg.Wait() } func (m *Manager) newReaderConcurrent(file *os.File, fp *fingerprint.Fingerprint) (*reader, error) { @@ -202,3 +221,45 @@ func (m *Manager) syncLastPollFilesConcurrent(ctx context.Context) { m.syncLastPollFiles(ctx) } + +// check if current file is a different file after copy-truncate +func (m *Manager) checkTruncate(r *reader) bool { + /* + Suppose file.log with content "ABCDEG" gets truncated to "". + But it has the cursor at position "5" (i.e. 'G') in memory. + + If the updated file.log has it's content "QWERTYXYZ", + next call to read() on previously opened file with return "XYZ". + This is undesriable and will cause duplication. + + NOTE: we haven't closed the previouly opened file + Check if it's updated version of previously opened file. + */ + + // store current offset + oldOffset := r.Offset + oldCursor, err := r.file.Seek(0, 1) + if err != nil { + m.Errorw("Failed to seek", err) + return false + } + + r.file.Seek(0, 0) + new := make([]byte, r.fingerprintSize) + n, err := r.file.Read(new) + if err != nil && err != io.EOF { + m.Errorw("Failed to read", err) + return false + } + + // restore the offset in case if it's not a truncate + r.Offset = oldOffset + _, err = r.file.Seek(oldCursor, 0) + if err != nil { + m.Errorw("Failed to seek", err) + return false + } + + newFp := fingerprint.Fingerprint{FirstBytes: new[:n]} + return !newFp.StartsWith(r.Fingerprint) +} diff --git a/pkg/stanza/fileconsumer/rotation_test.go b/pkg/stanza/fileconsumer/rotation_test.go index 897596293e3a..860a32101da6 100644 --- a/pkg/stanza/fileconsumer/rotation_test.go +++ b/pkg/stanza/fileconsumer/rotation_test.go @@ -418,8 +418,6 @@ func TestTrackMovedAwayFiles(t *testing.T) { require.NoError(t, err) writeString(t, movedFile, "testlog2\n") operator.poll(context.Background()) - operator.poll(context.Background()) - operator.poll(context.Background()) waitForToken(t, emitCalls, []byte("testlog2")) } diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index c35203e3aecf..32e00888da6a 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -99,7 +99,7 @@ func buildTestManager(t testing.TB, cfg *Config, opts ...testManagerOption) (*Ma input, err := cfg.Build(testutil.Logger(t), testEmitFunc(tmc.emitChan)) require.NoError(t, err) if tmc.initializeChannel && useThreadPool.IsEnabled() { - input.readerChan = make(chan readerEnvelope, cfg.MaxConcurrentFiles) + input.readerChan = make(chan readerEnvelope, cfg.MaxConcurrentFiles/2) ctx, cancel := context.WithCancel(context.Background()) input.cancel = cancel for i := 0; i < cfg.MaxConcurrentFiles/2; i++ { From cae32901f8cad9dabae455b3164f89b6e84043e9 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Sat, 16 Sep 2023 13:52:03 +0530 Subject: [PATCH 16/19] fix: reduce redundancy test cases --- pkg/stanza/fileconsumer/benchmark_test.go | 2 +- pkg/stanza/fileconsumer/file.go | 5 ++++- pkg/stanza/fileconsumer/file_test.go | 27 +++++++++++++---------- pkg/stanza/fileconsumer/rotation_test.go | 8 +++---- pkg/stanza/fileconsumer/util_test.go | 19 +++++----------- 5 files changed, 29 insertions(+), 32 deletions(-) diff --git a/pkg/stanza/fileconsumer/benchmark_test.go b/pkg/stanza/fileconsumer/benchmark_test.go index f18281bf7b34..51b600f8bc0e 100644 --- a/pkg/stanza/fileconsumer/benchmark_test.go +++ b/pkg/stanza/fileconsumer/benchmark_test.go @@ -269,7 +269,7 @@ func BenchmarkFileSizeVarying(b *testing.B) { cfg.MaxConcurrentFiles = fileSize.maxConcurrent emitCalls := make(chan *emitParams, max(fileSize.logs...)*b.N) // large enough to hold all the logs - operator, _ := buildTestManager(b, cfg, withEmitChan(emitCalls), withReaderChan()) + operator, _ := buildTestManager(b, cfg, withEmitChan(emitCalls)) operator.persister = testutil.NewMockPersister("test") defer func() { require.NoError(b, operator.Stop()) diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index c62cc104cf59..b1ffda689151 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -50,6 +50,7 @@ type Manager struct { // Following fields are used only when useThreadPool is enabled knownFilesLock sync.RWMutex trieLock sync.RWMutex + once sync.Once workerWg sync.WaitGroup readerChan chan readerEnvelope @@ -74,7 +75,9 @@ func (m *Manager) Start(persister operator.Persister) error { // If useThreadPool is enabled, kick off the worker threads if useThreadPool.IsEnabled() { - m.kickoffThreads(ctx) + m.once.Do(func() { + m.kickoffThreads(ctx) + }) } // Start polling goroutine diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 59912a20d565..8fd1f89a9b90 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -93,10 +93,10 @@ func TestAddFileFields(t *testing.T) { // AddFileResolvedFields tests that the `log.file.name_resolved` and `log.file.path_resolved` fields are included // when IncludeFileNameResolved and IncludeFilePathResolved are set to true func TestAddFileResolvedFields(t *testing.T) { - t.Parallel() if runtime.GOOS == windowsOS { t.Skip("Windows symlinks usage disabled for now. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21088") } + t.Parallel() tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) @@ -460,7 +460,7 @@ func TestReadNewLogs(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) + operator, emitCalls := buildTestManager(t, cfg) operator.persister = testutil.NewMockPersister("test") // Poll once so we know this isn't a new file @@ -488,7 +488,7 @@ func TestReadExistingAndNewLogs(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) + operator, emitCalls := buildTestManager(t, cfg) operator.persister = testutil.NewMockPersister("test") // Start with a file with an entry in it, and expect that entry @@ -512,7 +512,7 @@ func TestStartAtEnd(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) - operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) + operator, emitCalls := buildTestManager(t, cfg) operator.persister = testutil.NewMockPersister("test") temp := openTemp(t, tempDir) @@ -540,7 +540,7 @@ func TestStartAtEndNewFile(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) + operator, emitCalls := buildTestManager(t, cfg) operator.persister = testutil.NewMockPersister("test") operator.poll(context.Background()) @@ -656,7 +656,7 @@ func TestSplitWrite(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) + operator, emitCalls := buildTestManager(t, cfg) operator.persister = testutil.NewMockPersister("test") temp := openTemp(t, tempDir) @@ -676,7 +676,7 @@ func TestIgnoreEmptyFiles(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) + operator, emitCalls := buildTestManager(t, cfg) operator.persister = testutil.NewMockPersister("test") temp := openTemp(t, tempDir) @@ -997,7 +997,7 @@ func TestFileBatching(t *testing.T) { cfg.MaxConcurrentFiles = maxConcurrentFiles cfg.MaxBatches = maxBatches emitCalls := make(chan *emitParams, files*linesPerFile) - operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls), withReaderChan()) + operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls)) operator.persister = testutil.NewMockPersister("test") core, observedLogs := observer.New(zap.DebugLevel) @@ -1353,7 +1353,7 @@ func TestDeleteAfterRead(t *testing.T) { cfg.StartAt = "beginning" cfg.DeleteAfterRead = true emitCalls := make(chan *emitParams, totalLines) - operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls), withReaderChan()) + operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls)) operator.persister = testutil.NewMockPersister("test") operator.poll(context.Background()) @@ -1385,7 +1385,7 @@ func TestMaxBatching(t *testing.T) { cfg.MaxConcurrentFiles = maxConcurrentFiles cfg.MaxBatches = maxBatches emitCalls := make(chan *emitParams, files*linesPerFile) - operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls), withReaderChan()) + operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls)) operator.persister = testutil.NewMockPersister("test") core, observedLogs := observer.New(zap.DebugLevel) @@ -1502,7 +1502,7 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) { cfg.StartAt = "beginning" cfg.DeleteAfterRead = true emitCalls := make(chan *emitParams, longFileLines+1) - operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls), withReaderChan()) + operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls)) operator.persister = testutil.NewMockPersister("test") shortFile := openTemp(t, tempDir) @@ -1544,6 +1544,9 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) { // Stop consuming before long file has been fully consumed cancel() + + // Stop the worker threads + // Following section is a no-op if feature gate is disabled, so no need to check explicitly operator.cancel() if operator.readerChan != nil { close(operator.readerChan) @@ -1661,7 +1664,7 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.FingerprintSize = 18 cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) + operator, emitCalls := buildTestManager(t, cfg) operator.persister = testutil.NewMockPersister("test") // Both of they will be include diff --git a/pkg/stanza/fileconsumer/rotation_test.go b/pkg/stanza/fileconsumer/rotation_test.go index 860a32101da6..5c02fddd86e4 100644 --- a/pkg/stanza/fileconsumer/rotation_test.go +++ b/pkg/stanza/fileconsumer/rotation_test.go @@ -357,7 +357,7 @@ func TestMoveFile(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) + operator, emitCalls := buildTestManager(t, cfg) operator.persister = testutil.NewMockPersister("test") temp1 := openTemp(t, tempDir) @@ -389,7 +389,7 @@ func TestTrackMovedAwayFiles(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) + operator, emitCalls := buildTestManager(t, cfg) operator.persister = testutil.NewMockPersister("test") temp1 := openTemp(t, tempDir) @@ -474,7 +474,7 @@ func TestTruncateThenWrite(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) + operator, emitCalls := buildTestManager(t, cfg) operator.persister = testutil.NewMockPersister("test") temp1 := openTemp(t, tempDir) @@ -511,7 +511,7 @@ func TestCopyTruncateWriteBoth(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg, withReaderChan()) + operator, emitCalls := buildTestManager(t, cfg) operator.persister = testutil.NewMockPersister("test") temp1 := openTemp(t, tempDir) diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index 32e00888da6a..603d4e72480f 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -73,8 +73,7 @@ type emitParams struct { } type testManagerConfig struct { - emitChan chan *emitParams - initializeChannel bool + emitChan chan *emitParams } type testManagerOption func(*testManagerConfig) @@ -85,12 +84,6 @@ func withEmitChan(emitChan chan *emitParams) testManagerOption { } } -func withReaderChan() testManagerOption { - return func(c *testManagerConfig) { - c.initializeChannel = true - } -} - func buildTestManager(t testing.TB, cfg *Config, opts ...testManagerOption) (*Manager, chan *emitParams) { tmc := &testManagerConfig{emitChan: make(chan *emitParams, 100)} for _, opt := range opts { @@ -98,14 +91,12 @@ func buildTestManager(t testing.TB, cfg *Config, opts ...testManagerOption) (*Ma } input, err := cfg.Build(testutil.Logger(t), testEmitFunc(tmc.emitChan)) require.NoError(t, err) - if tmc.initializeChannel && useThreadPool.IsEnabled() { - input.readerChan = make(chan readerEnvelope, cfg.MaxConcurrentFiles/2) + if useThreadPool.IsEnabled() { ctx, cancel := context.WithCancel(context.Background()) input.cancel = cancel - for i := 0; i < cfg.MaxConcurrentFiles/2; i++ { - input.workerWg.Add(1) - go input.worker(ctx) - } + input.once.Do(func() { + input.kickoffThreads(ctx) + }) } return input, tmc.emitChan } From 506cefa1f21cc2c0672d3cb8dae0409252e9b054 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Mon, 18 Sep 2023 22:23:04 +0530 Subject: [PATCH 17/19] chore: rename functions --- pkg/stanza/fileconsumer/file.go | 4 ++-- pkg/stanza/fileconsumer/file_threadpool.go | 9 ++++++--- pkg/stanza/fileconsumer/util_test.go | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index b1ffda689151..2f86ae4121ad 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -76,7 +76,7 @@ func (m *Manager) Start(persister operator.Persister) error { // If useThreadPool is enabled, kick off the worker threads if useThreadPool.IsEnabled() { m.once.Do(func() { - m.kickoffThreads(ctx) + m.startConsumers(ctx) }) } @@ -91,7 +91,7 @@ func (m *Manager) Stop() error { m.cancel() m.wg.Wait() if useThreadPool.IsEnabled() { - m.shutdownThreads() + m.stopConsumers() } m.roller.cleanup() diff --git a/pkg/stanza/fileconsumer/file_threadpool.go b/pkg/stanza/fileconsumer/file_threadpool.go index 3f0c872e1539..d783c5b7ca78 100755 --- a/pkg/stanza/fileconsumer/file_threadpool.go +++ b/pkg/stanza/fileconsumer/file_threadpool.go @@ -19,7 +19,8 @@ type readerEnvelope struct { trieKey *fingerprint.Fingerprint } -func (m *Manager) kickoffThreads(ctx context.Context) { +// startConsumers starts a given number of goroutines consuming items from the channel +func (m *Manager) startConsumers(ctx context.Context) { m.readerChan = make(chan readerEnvelope, m.maxBatchFiles) for i := 0; i < m.maxBatchFiles; i++ { m.workerWg.Add(1) @@ -27,17 +28,19 @@ func (m *Manager) kickoffThreads(ctx context.Context) { } } -func (m *Manager) shutdownThreads() { +// stopConsumers closes the channel created during startConsumers, wait for the consumers to finish execution +// and saves any files left +func (m *Manager) stopConsumers() { if m.readerChan != nil { close(m.readerChan) } m.workerWg.Wait() // save off any files left // As we already cancelled our current context, create a new one to save any left offsets - // This is only applicable for `filelog.useThreadPool` featuregate ctx, cancel := context.WithCancel(context.Background()) m.syncLastPollFilesConcurrent(ctx) cancel() + m.once = sync.Once{} } // poll checks all the watched paths for new entries diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index 603d4e72480f..6ccfda41e4cd 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -95,7 +95,7 @@ func buildTestManager(t testing.TB, cfg *Config, opts ...testManagerOption) (*Ma ctx, cancel := context.WithCancel(context.Background()) input.cancel = cancel input.once.Do(func() { - input.kickoffThreads(ctx) + input.startConsumers(ctx) }) } return input, tmc.emitChan From 99e507cfdba634ab5b93090287addae7948f05a6 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Thu, 21 Sep 2023 22:59:45 +0530 Subject: [PATCH 18/19] chore: rework checkTruncate --- pkg/stanza/fileconsumer/file_threadpool.go | 26 ++-------------------- 1 file changed, 2 insertions(+), 24 deletions(-) diff --git a/pkg/stanza/fileconsumer/file_threadpool.go b/pkg/stanza/fileconsumer/file_threadpool.go index d783c5b7ca78..a6de938d8a7b 100755 --- a/pkg/stanza/fileconsumer/file_threadpool.go +++ b/pkg/stanza/fileconsumer/file_threadpool.go @@ -5,7 +5,6 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collecto import ( "context" - "io" "os" "sync" @@ -239,30 +238,9 @@ func (m *Manager) checkTruncate(r *reader) bool { Check if it's updated version of previously opened file. */ - // store current offset - oldOffset := r.Offset - oldCursor, err := r.file.Seek(0, 1) + refreshedFp, err := fingerprint.New(r.file, r.fingerprintSize) if err != nil { - m.Errorw("Failed to seek", err) return false } - - r.file.Seek(0, 0) - new := make([]byte, r.fingerprintSize) - n, err := r.file.Read(new) - if err != nil && err != io.EOF { - m.Errorw("Failed to read", err) - return false - } - - // restore the offset in case if it's not a truncate - r.Offset = oldOffset - _, err = r.file.Seek(oldCursor, 0) - if err != nil { - m.Errorw("Failed to seek", err) - return false - } - - newFp := fingerprint.Fingerprint{FirstBytes: new[:n]} - return !newFp.StartsWith(r.Fingerprint) + return !refreshedFp.StartsWith(r.Fingerprint) } From c822f28c87ea83f47ce7935eb1f1bb9a34374f24 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Sat, 23 Sep 2023 20:01:02 +0530 Subject: [PATCH 19/19] fix: race conditions detected during pipeline run --- pkg/stanza/fileconsumer/file.go | 2 +- pkg/stanza/fileconsumer/file_test.go | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 2f86ae4121ad..1246c3e34380 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -73,7 +73,7 @@ func (m *Manager) Start(persister operator.Persister) error { m.Warnf("finding files: %v", err) } - // If useThreadPool is enabled, kick off the worker threads + // If useThreadPool is enabled, start the worker threads if useThreadPool.IsEnabled() { m.once.Do(func() { m.startConsumers(ctx) diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 738e63996537..83150ac4bbea 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -1589,7 +1589,7 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) { cancel() // Stop the worker threads - // Following section is a no-op if feature gate is disabled, so no need to check explicitly + // Following section is a no-op if threadpool feature gate is disabled, so no need to check explicitly operator.cancel() if operator.readerChan != nil { close(operator.readerChan) @@ -1678,7 +1678,9 @@ func TestHeaderPersistanceInHeader(t *testing.T) { op1.poll(context.Background()) // for threadpool, as the poll is asynchronous, allow it to complete one poll cycle - time.Sleep(500 * time.Millisecond) + if useThreadPool.IsEnabled() { + time.Sleep(500 * time.Millisecond) + } require.NoError(t, op1.Stop())