Skip to content

Commit

Permalink
Change to NewStreamParser to accept larger inputs from scanner (influ…
Browse files Browse the repository at this point in the history
…xdata#8892)

* change to NewStreamParser to accept larger inputs from scanner

* fmt changes
  • Loading branch information
tootedom authored Apr 13, 2021
1 parent 5f26582 commit f3229f5
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 19 deletions.
26 changes: 14 additions & 12 deletions plugins/common/shim/processor.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package shim

import (
"bufio"
"fmt"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/processors"
)

Expand Down Expand Up @@ -37,12 +36,7 @@ func (s *Shim) RunProcessor() error {
acc := agent.NewAccumulator(s, s.metricCh)
acc.SetPrecision(time.Nanosecond)

parser, err := parsers.NewInfluxParser()
if err != nil {
return fmt.Errorf("Failed to create new parser: %w", err)
}

err = s.Processor.Start(acc)
err := s.Processor.Start(acc)
if err != nil {
return fmt.Errorf("failed to start processor: %w", err)
}
Expand All @@ -54,13 +48,21 @@ func (s *Shim) RunProcessor() error {
wg.Done()
}()

scanner := bufio.NewScanner(s.stdin)
for scanner.Scan() {
m, err := parser.ParseLine(scanner.Text())
parser := influx.NewStreamParser(s.stdin)
for {
m, err := parser.Next()
if err != nil {
fmt.Fprintf(s.stderr, "Failed to parse metric: %s\b", err)
if err == influx.EOF {
break // stream ended
}
if parseErr, isParseError := err.(*influx.ParseError); isParseError {
fmt.Fprintf(s.stderr, "Failed to parse metric: %s\b", parseErr)
continue
}
fmt.Fprintf(s.stderr, "Failure during reading stdin: %s\b", err)
continue
}

s.Processor.Add(m, acc)
}

Expand Down
35 changes: 28 additions & 7 deletions plugins/common/shim/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"io"
"io/ioutil"
"math/rand"
"sync"
"testing"
"time"
Expand All @@ -16,7 +17,21 @@ import (
)

func TestProcessorShim(t *testing.T) {
p := &testProcessor{}
testSendAndRecieve(t, "f1", "fv1")
}

func TestProcessorShimWithLargerThanDefaultScannerBufferSize(t *testing.T) {
letters := []rune("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
b := make([]rune, bufio.MaxScanTokenSize*2)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}

testSendAndRecieve(t, "f1", string(b))
}

func testSendAndRecieve(t *testing.T, fieldKey string, fieldValue string) {
p := &testProcessor{"hi", "mom"}

stdinReader, stdinWriter := io.Pipe()
stdoutReader, stdoutWriter := io.Pipe()
Expand Down Expand Up @@ -45,7 +60,8 @@ func TestProcessorShim(t *testing.T) {
"a": "b",
},
map[string]interface{}{
"v": 1,
"v": 1,
fieldKey: fieldValue,
},
time.Now(),
)
Expand All @@ -62,19 +78,24 @@ func TestProcessorShim(t *testing.T) {
mOut, err := parser.ParseLine(out)
require.NoError(t, err)

val, ok := mOut.GetTag("hi")
val, ok := mOut.GetTag(p.tagName)
require.True(t, ok)
require.Equal(t, "mom", val)

require.Equal(t, p.tagValue, val)
val2, ok := mOut.Fields()[fieldKey]
require.True(t, ok)
require.Equal(t, fieldValue, val2)
go ioutil.ReadAll(r)
wg.Wait()
}

type testProcessor struct{}
type testProcessor struct {
tagName string
tagValue string
}

func (p *testProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
for _, metric := range in {
metric.AddTag("hi", "mom")
metric.AddTag(p.tagName, p.tagValue)
}
return in
}
Expand Down

0 comments on commit f3229f5

Please sign in to comment.