From f3229f5ec1ea3234b22ca2b895a1b602eae73efa Mon Sep 17 00:00:00 2001 From: Dominic Tootell Date: Tue, 13 Apr 2021 22:13:46 +0100 Subject: [PATCH] Change to NewStreamParser to accept larger inputs from scanner (#8892) * change to NewStreamParser to accept larger inputs from scanner * fmt changes --- plugins/common/shim/processor.go | 26 +++++++++++--------- plugins/common/shim/processor_test.go | 35 +++++++++++++++++++++------ 2 files changed, 42 insertions(+), 19 deletions(-) diff --git a/plugins/common/shim/processor.go b/plugins/common/shim/processor.go index 33dceba872759..d8f660b360cd6 100644 --- a/plugins/common/shim/processor.go +++ b/plugins/common/shim/processor.go @@ -1,14 +1,13 @@ package shim import ( - "bufio" "fmt" "sync" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/agent" - "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/processors" ) @@ -37,12 +36,7 @@ func (s *Shim) RunProcessor() error { acc := agent.NewAccumulator(s, s.metricCh) acc.SetPrecision(time.Nanosecond) - parser, err := parsers.NewInfluxParser() - if err != nil { - return fmt.Errorf("Failed to create new parser: %w", err) - } - - err = s.Processor.Start(acc) + err := s.Processor.Start(acc) if err != nil { return fmt.Errorf("failed to start processor: %w", err) } @@ -54,13 +48,21 @@ func (s *Shim) RunProcessor() error { wg.Done() }() - scanner := bufio.NewScanner(s.stdin) - for scanner.Scan() { - m, err := parser.ParseLine(scanner.Text()) + parser := influx.NewStreamParser(s.stdin) + for { + m, err := parser.Next() if err != nil { - fmt.Fprintf(s.stderr, "Failed to parse metric: %s\b", err) + if err == influx.EOF { + break // stream ended + } + if parseErr, isParseError := err.(*influx.ParseError); isParseError { + fmt.Fprintf(s.stderr, "Failed to parse metric: %s\b", parseErr) + continue + } + fmt.Fprintf(s.stderr, "Failure during reading stdin: %s\b", err) continue } + s.Processor.Add(m, acc) } diff --git a/plugins/common/shim/processor_test.go b/plugins/common/shim/processor_test.go index 6126656b8fcc6..ea2e61a459469 100644 --- a/plugins/common/shim/processor_test.go +++ b/plugins/common/shim/processor_test.go @@ -4,6 +4,7 @@ import ( "bufio" "io" "io/ioutil" + "math/rand" "sync" "testing" "time" @@ -16,7 +17,21 @@ import ( ) func TestProcessorShim(t *testing.T) { - p := &testProcessor{} + testSendAndRecieve(t, "f1", "fv1") +} + +func TestProcessorShimWithLargerThanDefaultScannerBufferSize(t *testing.T) { + letters := []rune("ABCDEFGHIJKLMNOPQRSTUVWXYZ") + b := make([]rune, bufio.MaxScanTokenSize*2) + for i := range b { + b[i] = letters[rand.Intn(len(letters))] + } + + testSendAndRecieve(t, "f1", string(b)) +} + +func testSendAndRecieve(t *testing.T, fieldKey string, fieldValue string) { + p := &testProcessor{"hi", "mom"} stdinReader, stdinWriter := io.Pipe() stdoutReader, stdoutWriter := io.Pipe() @@ -45,7 +60,8 @@ func TestProcessorShim(t *testing.T) { "a": "b", }, map[string]interface{}{ - "v": 1, + "v": 1, + fieldKey: fieldValue, }, time.Now(), ) @@ -62,19 +78,24 @@ func TestProcessorShim(t *testing.T) { mOut, err := parser.ParseLine(out) require.NoError(t, err) - val, ok := mOut.GetTag("hi") + val, ok := mOut.GetTag(p.tagName) require.True(t, ok) - require.Equal(t, "mom", val) - + require.Equal(t, p.tagValue, val) + val2, ok := mOut.Fields()[fieldKey] + require.True(t, ok) + require.Equal(t, fieldValue, val2) go ioutil.ReadAll(r) wg.Wait() } -type testProcessor struct{} +type testProcessor struct { + tagName string + tagValue string +} func (p *testProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric { for _, metric := range in { - metric.AddTag("hi", "mom") + metric.AddTag(p.tagName, p.tagValue) } return in }