From ee40f6927f47639974a820f4df4454fbf392251f Mon Sep 17 00:00:00 2001 From: Hanna Tamoudi Date: Fri, 20 Sep 2024 13:14:31 +0200 Subject: [PATCH] Handle multiline processors (#2063) --- internal/elasticsearch/ingest/processors.go | 91 +++++- .../elasticsearch/ingest/processors_test.go | 266 +++++++++++++---- .../runners/pipeline/coverage_test.go | 268 ++++++++++++++++++ 3 files changed, 563 insertions(+), 62 deletions(-) create mode 100644 internal/testrunner/runners/pipeline/coverage_test.go diff --git a/internal/elasticsearch/ingest/processors.go b/internal/elasticsearch/ingest/processors.go index 796d5b348..40805d902 100644 --- a/internal/elasticsearch/ingest/processors.go +++ b/internal/elasticsearch/ingest/processors.go @@ -5,6 +5,8 @@ package ingest import ( + "bufio" + "bytes" "fmt" "gopkg.in/yaml.v3" @@ -50,7 +52,7 @@ func (p Pipeline) OriginalProcessors() (procs []Processor, err error) { return procs, nil } -// extract a list of processors from a pipeline definition in YAML format. +// processorsFromYAML extracts a list of processors from a pipeline definition in YAML format. func processorsFromYAML(content []byte) (procs []Processor, err error) { var p struct { Processors []yaml.Node @@ -58,6 +60,7 @@ func processorsFromYAML(content []byte) (procs []Processor, err error) { if err = yaml.Unmarshal(content, &p); err != nil { return nil, err } + for idx, entry := range p.Processors { if entry.Kind != yaml.MappingNode || len(entry.Content) != 2 { return nil, fmt.Errorf("processor#%d is not a single-key map (kind:%v content:%d)", idx, entry.Kind, len(entry.Content)) @@ -70,22 +73,88 @@ func processorsFromYAML(content []byte) (procs []Processor, err error) { return nil, fmt.Errorf("error decoding processor#%d type: %w", idx, err) } proc.FirstLine = entry.Line - proc.LastLine = lastLine(&entry) + lastLine, err := getProcessorLastLine(idx, p.Processors, proc, content) + if err != nil { + return nil, err + } + proc.LastLine = lastLine + procs = append(procs, proc) } - return procs, nil + return procs, err } -// returns the last (greater) line number used by a yaml.Node. -func lastLine(node *yaml.Node) int { +// getProcessorLastLine determines the last line number for the given processor. +func getProcessorLastLine(idx int, processors []yaml.Node, currentProcessor Processor, content []byte) (int, error) { + if idx < len(processors)-1 { + var endProcessor = processors[idx+1].Line - 1 + if endProcessor < currentProcessor.FirstLine { + return currentProcessor.FirstLine, nil + } else { + return processors[idx+1].Line - 1, nil + } + } + + return nextProcessorOrEndOfPipeline(content) +} + +// nextProcessorOrEndOfPipeline get the line before the node after the processors node. If there is none, it returns the end of file line +func nextProcessorOrEndOfPipeline(content []byte) (int, error) { + var root yaml.Node + if err := yaml.Unmarshal(content, &root); err != nil { + return 0, fmt.Errorf("error unmarshaling YAML: %v", err) + } + + var nodes []*yaml.Node + extractNodesFromMapping(&root, &nodes) + for i, node := range nodes { + + if node.Value == "processors" { + if i < len(nodes)-1 { + + return nodes[i+1].Line - 1, nil + } + } + + } + return countLinesInBytes(content) +} + +// extractNodesFromMapping recursively extracts all nodes from MappingNodes within DocumentNodes. +func extractNodesFromMapping(node *yaml.Node, nodes *[]*yaml.Node) { if node == nil { - return 0 + return } - last := node.Line - for _, inner := range node.Content { - if line := lastLine(inner); line > last { - last = line + + if node.Kind == yaml.DocumentNode { + for _, child := range node.Content { + extractNodesFromMapping(child, nodes) + } + return + } + + if node.Kind == yaml.MappingNode { + for _, child := range node.Content { + if child.Kind == yaml.MappingNode || child.Kind == yaml.ScalarNode { + *nodes = append(*nodes, child) + } + extractNodesFromMapping(child, nodes) } } - return last +} + +// countLinesInBytes counts the number of lines in the given byte slice. +func countLinesInBytes(data []byte) (int, error) { + scanner := bufio.NewScanner(bytes.NewReader(data)) + lineCount := 0 + + for scanner.Scan() { + lineCount++ + } + + if err := scanner.Err(); err != nil { + return 0, fmt.Errorf("error reading data: %w", err) + } + + return lineCount, nil } diff --git a/internal/elasticsearch/ingest/processors_test.go b/internal/elasticsearch/ingest/processors_test.go index 0d3d161d3..4ab43ef5c 100644 --- a/internal/elasticsearch/ingest/processors_test.go +++ b/internal/elasticsearch/ingest/processors_test.go @@ -24,61 +24,104 @@ func TestResource_Processors(t *testing.T) { content: []byte(`--- description: Made up pipeline processors: -# First processor. -- grok: - tag: Extract header - field: message - patterns: - - \[%{APACHE_TIME:apache.error.timestamp}\] \[%{LOGLEVEL:log.level}\]( \[client - %{IPORHOST:source.address}(:%{POSINT:source.port})?\])? %{GREEDYDATA:message} - - \[%{APACHE_TIME:apache.error.timestamp}\] \[%{DATA:apache.error.module}:%{LOGLEVEL:log.level}\] - \[pid %{NUMBER:process.pid:long}(:tid %{NUMBER:process.thread.id:long})?\]( - \[client %{IPORHOST:source.address}(:%{POSINT:source.port})?\])? %{GREEDYDATA:message} - pattern_definitions: - APACHE_TIME: '%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}' - ignore_missing: true + - grok: + tag: Extract header + field: message + patterns: + - \[%{APACHE_TIME:apache.error.timestamp}\] \[%{LOGLEVEL:log.level}\]( \[client%{IPORHOST:source.address}(:%{POSINT:source.port})?\])? %{GREEDYDATA:message} + - \[%{APACHE_TIME:apache.error.timestamp}\] \[%{DATA:apache.error.module}:%{LOGLEVEL:log.level}\] + ignore_missing: true -- date: - field: apache.error.timestamp - target_field: '@timestamp' - formats: - - EEE MMM dd H:m:s yyyy - - EEE MMM dd H:m:s.SSSSSS yyyy - on_failure: - - append: + - date: + field: apache.error.timestamp + target_field: '@timestamp' + formats: + - EEE MMM dd H:m:s yyyy + - EEE MMM dd H:m:s.SSSSSS yyyy + on_failure: + - append: + field: error.message + value: '{{ _ingest.on_failure_message }}' + - set: + description: Set event category + field: event.category + value: web + # Some script + - script: + lang: painless + + - grok: + field: source.address + ignore_missing: true + patterns: + - ^(%{IP:source.ip}|%{HOSTNAME:source.domain})$ + - rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true +on_failure: + - set: field: error.message value: '{{ _ingest.on_failure_message }}' -- set: - description: Set event category - field: event.category - value: web -# Some script -- script: - lang: painless - source: >- - [...] +`), + expected: []Processor{ + {Type: "grok", FirstLine: 4, LastLine: 11}, + {Type: "date", FirstLine: 12, LastLine: 21}, + {Type: "set", FirstLine: 22, LastLine: 26}, + {Type: "script", FirstLine: 27, LastLine: 29}, + {Type: "grok", FirstLine: 30, LastLine: 34}, + {Type: "rename", FirstLine: 35, LastLine: 38}, + }, + }, + { + name: "Yaml pipeline", + format: "yml", + content: []byte(`--- +description: Made up pipeline +processors: + - grok: + tag: Extract header + field: message + patterns: + - \[%{APACHE_TIME:apache.error.timestamp}\] \[%{LOGLEVEL:log.level}\]( \[client%{IPORHOST:source.address}(:%{POSINT:source.port})?\])? %{GREEDYDATA:message} + - \[%{APACHE_TIME:apache.error.timestamp}\] \[%{DATA:apache.error.module}:%{LOGLEVEL:log.level}\] + ignore_missing: true -- grok: - field: source.address - ignore_missing: true - patterns: - - ^(%{IP:source.ip}|%{HOSTNAME:source.domain})$ -- rename: - field: source.as.organization_name - target_field: source.as.organization.name - ignore_missing: true -on_failure: -- set: - field: error.message - value: '{{ _ingest.on_failure_message }}' + - date: + field: apache.error.timestamp + target_field: '@timestamp' + formats: + - EEE MMM dd H:m:s yyyy + - EEE MMM dd H:m:s.SSSSSS yyyy + on_failure: + - append: + field: error.message + value: '{{ _ingest.on_failure_message }}' + - set: + description: Set event category + field: event.category + value: web + # Some script + - script: + lang: painless + + - grok: + field: source.address + ignore_missing: true + patterns: + - ^(%{IP:source.ip}|%{HOSTNAME:source.domain})$ + - rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true `), expected: []Processor{ - {Type: "grok", FirstLine: 5, LastLine: 16}, - {Type: "date", FirstLine: 18, LastLine: 27}, - {Type: "set", FirstLine: 28, LastLine: 31}, - {Type: "script", FirstLine: 33, LastLine: 35}, - {Type: "grok", FirstLine: 38, LastLine: 42}, - {Type: "rename", FirstLine: 43, LastLine: 46}, + {Type: "grok", FirstLine: 4, LastLine: 11}, + {Type: "date", FirstLine: 12, LastLine: 21}, + {Type: "set", FirstLine: 22, LastLine: 26}, + {Type: "script", FirstLine: 27, LastLine: 29}, + {Type: "grok", FirstLine: 30, LastLine: 34}, + {Type: "rename", FirstLine: 35, LastLine: 38}, }, }, { @@ -109,10 +152,10 @@ on_failure: `), expected: []Processor{ {Type: "drop", FirstLine: 3, LastLine: 3}, - {Type: "set", FirstLine: 4, LastLine: 7}, + {Type: "set", FirstLine: 4, LastLine: 8}, {Type: "remove", FirstLine: 9, LastLine: 9}, {Type: "set", FirstLine: 9, LastLine: 9}, - {Type: "set", FirstLine: 10, LastLine: 13}, + {Type: "set", FirstLine: 10, LastLine: 15}, }, }, { @@ -147,6 +190,42 @@ on_failure: content: []byte(`{"processors": {"drop": {}},"`), wantErr: true, }, + { + name: "Json single line processor", + format: "json", + content: []byte(`{ + "description": "Pipeline for parsing silly logs.", + "processors": [{"drop": {"if":"ctx.drop!=null"}}] + }`), + expected: []Processor{ + {Type: "drop", FirstLine: 3, LastLine: 4}, + }, + }, + { + name: "Json multiline processor", + format: "json", + content: []byte(`{ + "processors": [ + { + "script": { + "description": "Extract fields", + "lang": "painless", + "source": "String[] envSplit = ctx['env'].splitOnToken(params['delimiter']);\nArrayList tags = new ArrayList();\ntags.add(envSplit[params['position']].trim());\nctx['tags'] = tags;" + } + } + ] +}`), + expected: []Processor{ + {Type: "script", FirstLine: 3, LastLine: 11}, + // Source will be processed as multiline: + // "source": """ + // String[] envSplit = ctx['env'].splitOnToken(params['delimiter']); + // ArrayList tags = new ArrayList(); + // tags.add(envSplit[params['position']].trim()); + // ctx['tags'] = tags; + // """, + }, + }, { name: "Malformed Yaml pipeline", format: "yml", @@ -162,6 +241,91 @@ processors: content: []byte(`foo123"`), wantErr: true, }, + { + name: "Yaml single line processor", + format: "yml", + content: []byte(`--- +processors: + - set: { field: "event.category", value: "web" }`), + expected: []Processor{ + {Type: "set", FirstLine: 3, LastLine: 3}, + }, + }, + { + name: "Yaml multiline processor", + format: "yml", + content: []byte(`--- +processors: + - script: + source: | + def a = 1; + def b = 2; +`), + expected: []Processor{ + {Type: "script", FirstLine: 3, LastLine: 6}, + }, + }, + { + name: "Yaml script with empty line characters", + format: "yml", + content: []byte(`--- +processors: + - script: + description: Do something. + tag: script_drop_null_empty_values + lang: painless + source: "def a = b \n + ; def b = 2; \n" +`), + expected: []Processor{ + {Type: "script", FirstLine: 3, LastLine: 8}, + }, + }, + { + name: "Yaml empty processor", + format: "yml", + content: []byte(`--- +processors: + - set:`), + expected: []Processor{ + {Type: "set", FirstLine: 3, LastLine: 3}, + }, + }, + { + name: "Yaml processors with comments", + format: "yml", + content: []byte(`--- +processors: + # First processor + - set: + field: "event.category" + value: "web" + + # Second processor + - script: + source: | + def a = 1; + def b = 2; +`), + expected: []Processor{ + {Type: "set", FirstLine: 4, LastLine: 8}, + {Type: "script", FirstLine: 9, LastLine: 12}, + }, + }, + { + name: "Yaml nested processor", + format: "yml", + content: []byte(`--- +processors: + - if: + condition: "ctx.event.category == 'web'" + processors: + - set: { field: "event.type", value: "start" } +`), + expected: []Processor{ + {Type: "if", FirstLine: 3, LastLine: 6}, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/internal/testrunner/runners/pipeline/coverage_test.go b/internal/testrunner/runners/pipeline/coverage_test.go new file mode 100644 index 000000000..146319ff8 --- /dev/null +++ b/internal/testrunner/runners/pipeline/coverage_test.go @@ -0,0 +1,268 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package pipeline + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-package/internal/elasticsearch/ingest" + "github.com/elastic/elastic-package/internal/testrunner" +) + +func TestGenericCoverageForSinglePipeline(t *testing.T) { + for _, testcase := range []struct { + title string + pipelineRelPath string + src []ingest.Processor + pstats ingest.PipelineStats + expectedLinesCovered int64 + expectedFile *testrunner.GenericFile + }{ + { + title: "Single Processor - covered", + pipelineRelPath: "", + src: []ingest.Processor{ + {Type: "append", FirstLine: 1, LastLine: 1}, + }, + pstats: ingest.PipelineStats{ + StatsRecord: ingest.StatsRecord{ + Count: 9, + Current: 10, + Failed: 11, + TimeInMillis: 12, + }, + Processors: []ingest.ProcessorStats{ + { + Type: "append", + Conditional: true, + Stats: ingest.StatsRecord{ + Count: 13, + Current: 14, + Failed: 15, + TimeInMillis: 16, + }, + }, + }, + }, + expectedLinesCovered: 1, + expectedFile: &testrunner.GenericFile{ + Path: "", + Lines: []*testrunner.GenericLine{ + {LineNumber: 1, Covered: true}, + }, + }, + }, + { + title: "Single Processor - not covered", + pipelineRelPath: "", + src: []ingest.Processor{ + {Type: "append", FirstLine: 1, LastLine: 1}, + }, + pstats: ingest.PipelineStats{ + StatsRecord: ingest.StatsRecord{ + Count: 9, + Current: 10, + Failed: 11, + TimeInMillis: 12, + }, + Processors: []ingest.ProcessorStats{ + { + Type: "append", + Conditional: true, + Stats: ingest.StatsRecord{ + Count: 0, + Current: 14, + Failed: 15, + TimeInMillis: 16, + }, + }, + }, + }, + expectedLinesCovered: 0, + expectedFile: &testrunner.GenericFile{ + Path: "", + Lines: []*testrunner.GenericLine{ + {LineNumber: 1, Covered: false}, + }, + }, + }, + { + title: "Multi Processor - covered", + pipelineRelPath: "", + src: []ingest.Processor{ + {Type: "append", FirstLine: 1, LastLine: 1}, + {Type: "geoip", FirstLine: 2, LastLine: 2}, + }, + pstats: ingest.PipelineStats{ + StatsRecord: ingest.StatsRecord{ + Count: 9, + Current: 10, + Failed: 11, + TimeInMillis: 12, + }, + Processors: []ingest.ProcessorStats{ + { + Type: "append", + Conditional: true, + Stats: ingest.StatsRecord{ + Count: 13, + Current: 14, + Failed: 15, + TimeInMillis: 16, + }, + }, + { + Type: "geoip", + Stats: ingest.StatsRecord{ + Count: 17, + Current: 18, + Failed: 19, + TimeInMillis: 20, + }, + }, + }, + }, + expectedLinesCovered: 2, + expectedFile: &testrunner.GenericFile{ + Path: "", + Lines: []*testrunner.GenericLine{ + {LineNumber: 1, Covered: true}, + {LineNumber: 2, Covered: true}, + }, + }, + }, + } { + t.Run(testcase.title, func(t *testing.T) { + linesCoveredResult, fileResult, _ := genericCoverageForSinglePipeline(testcase.pipelineRelPath, testcase.src, testcase.pstats) + assert.Equal(t, testcase.expectedLinesCovered, linesCoveredResult) + assert.Equal(t, testcase.expectedFile, fileResult) + }) + } +} + +func TestCoberturaForSinglePipeline(t *testing.T) { + const firstProcessorName = "append" + const firstProcessorFirstLine = 1 + const firstProcessorHitCount = 13 + + const secondProcessorName = "geoip" + const secondProcessorFirstLine = 2 + const secondProcessorHitCount = 17 + + const pipelineName = "Pipeline name" + + for _, testcase := range []struct { + title string + pipelineName string + pipelineRelPath string + src []ingest.Processor + pstats ingest.PipelineStats + expectedLinesCovered int64 + expectedClass *testrunner.CoberturaClass + }{ + { + title: "Single Processor - covered", + pipelineName: pipelineName, + pipelineRelPath: "", + src: []ingest.Processor{ + {Type: firstProcessorName, FirstLine: firstProcessorFirstLine, LastLine: firstProcessorFirstLine}, + }, + pstats: ingest.PipelineStats{ + StatsRecord: ingest.StatsRecord{ + Count: 9, + Current: 10, + Failed: 11, + TimeInMillis: 12, + }, + Processors: []ingest.ProcessorStats{ + { + Type: firstProcessorName, + Conditional: true, + Stats: ingest.StatsRecord{ + Count: firstProcessorHitCount, + Current: 14, + Failed: 15, + TimeInMillis: 16, + }, + }, + }, + }, + expectedLinesCovered: 1, + expectedClass: &testrunner.CoberturaClass{ + Name: pipelineName, + Lines: []*testrunner.CoberturaLine{ + {Number: firstProcessorFirstLine, Hits: firstProcessorHitCount}, + }, + Methods: []*testrunner.CoberturaMethod{ + {Name: firstProcessorName, Lines: []*testrunner.CoberturaLine{ + {Number: firstProcessorFirstLine, Hits: firstProcessorHitCount}, + }}, + }, + }, + }, + { + title: "Multi Processor - covered", + pipelineName: pipelineName, + pipelineRelPath: "", + src: []ingest.Processor{ + {Type: firstProcessorName, FirstLine: firstProcessorFirstLine, LastLine: firstProcessorFirstLine}, + {Type: secondProcessorName, FirstLine: secondProcessorFirstLine, LastLine: secondProcessorFirstLine}, + }, + pstats: ingest.PipelineStats{ + StatsRecord: ingest.StatsRecord{ + Count: 9, + Current: 10, + Failed: 11, + TimeInMillis: 12, + }, + Processors: []ingest.ProcessorStats{ + { + Type: firstProcessorName, + Conditional: true, + Stats: ingest.StatsRecord{ + Count: firstProcessorHitCount, + Current: 14, + Failed: 15, + TimeInMillis: 16, + }, + }, + { + Type: secondProcessorName, + Stats: ingest.StatsRecord{ + Count: secondProcessorHitCount, + Current: 18, + Failed: 19, + TimeInMillis: 20, + }, + }, + }, + }, + expectedLinesCovered: 2, + expectedClass: &testrunner.CoberturaClass{ + Name: pipelineName, + Lines: []*testrunner.CoberturaLine{ + {Number: firstProcessorFirstLine, Hits: firstProcessorHitCount}, + {Number: secondProcessorFirstLine, Hits: secondProcessorHitCount}, + }, + Methods: []*testrunner.CoberturaMethod{ + {Name: firstProcessorName, Lines: []*testrunner.CoberturaLine{ + {Number: firstProcessorFirstLine, Hits: firstProcessorHitCount}, + }}, + {Name: secondProcessorName, Lines: []*testrunner.CoberturaLine{ + {Number: secondProcessorFirstLine, Hits: secondProcessorHitCount}, + }}, + }, + }, + }, + } { + t.Run(testcase.title, func(t *testing.T) { + linesCoveredResult, classResult, _ := coberturaForSinglePipeline(testcase.pipelineName, testcase.pipelineRelPath, testcase.src, testcase.pstats) + assert.Equal(t, testcase.expectedLinesCovered, linesCoveredResult) + assert.Equal(t, testcase.expectedClass, classResult) + }) + } +}