Skip to content

Commit

Permalink
Don't use bufio.Scanner to read the log
Browse files Browse the repository at this point in the history
While using the scanner is convenient as it abstracts away EOL handling,
it forces the concept of tokens. Tokens have to fit into a particular
buffer size, and when logs have tons of uninterrupted text, this causes
the program to fail.

This commit replaces the use of bufio.Scanner with bufio.Reader.

Signed-off-by: Eric Chlebek <[email protected]>
  • Loading branch information
echlebek committed Dec 7, 2023
1 parent 7992e8c commit 8ac510f
Showing 1 changed file with 25 additions and 33 deletions.
58 changes: 25 additions & 33 deletions analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"io/ioutil"
"sync"
"sync/atomic"
)

const bufSize = 1000
Expand Down Expand Up @@ -51,10 +52,10 @@ func NewDiscardWriter() *DiscardWriter {
type AnalyzerFunc func([]byte) *Result

type Result struct {
Path string `json:"path"`
Match string `json:"match,omitempty"`
Err error `json:"error,omitempty"`
Offset int64 `json:"offset"`
Path string `json:"path"`
Match string `json:"match,omitempty"`
Err error `json:"error,omitempty"`
Offset int64 `json:"offset"`
}

type LineMsg struct {
Expand All @@ -77,46 +78,37 @@ func (a *Analyzer) Go(ctx context.Context) <-chan Result {
}

func (a *Analyzer) BytesRead() int64 {
a.wg.Wait()
return a.bytesRead
return atomic.LoadInt64(&a.bytesRead)
}

func (a *Analyzer) startProducer(ctx context.Context) <-chan LineMsg {
logLines := make(chan LineMsg, bufSize)
currentOffset := a.Offset
reader := bufio.NewReaderSize(a.Log, 32*1024*1024)
discard := NewDiscardWriter()
teeReader := io.TeeReader(reader, discard)
scanner := bufio.NewScanner(teeReader)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
var wg sync.WaitGroup
wg.Add(1)
a.wg.Add(1)
go func() {
defer wg.Done()
for scanner.Scan() {
line := scanner.Bytes()
// Copy the line, since the scanner can reclaim it
lineCopy := make([]byte, len(line))
copy(lineCopy, line)
select {
case <-ctx.Done():
close(logLines)
defer a.wg.Done()
defer close(logLines)
for {
line, err := reader.ReadBytes('\n')
if err != nil && err != io.EOF {
fatal("error while scanning log: %s: %s\n", a.Path, err)
}
atomic.AddInt64(&a.bytesRead, int64(len(line)))
if len(line) > 0 {
select {
case <-ctx.Done():
return
case logLines <- LineMsg{Line: line, Offset: currentOffset}:
currentOffset += int64(len(line))
}
if err == io.EOF {
return
}
} else {
return
case logLines <- LineMsg{Line: lineCopy, Offset: currentOffset}:
currentOffset += int64(len(line))
}
}
if err := scanner.Err(); err != nil {
fatal("error while scanning log: %s :: %s\n", a.Path, err)
}
close(logLines)
}()
go func() {
defer a.wg.Done()
wg.Wait()
a.bytesRead = discard.BytesRead
}()
return logLines
}
Expand Down

0 comments on commit 8ac510f

Please sign in to comment.