From e7f8f90271bf7fe66c50d5a3488deee388e0b402 Mon Sep 17 00:00:00 2001 From: Hanna Tamoudi Date: Mon, 2 Sep 2024 11:48:35 +0200 Subject: [PATCH 1/8] handle multiline processors when getting last line --- internal/elasticsearch/ingest/processors.go | 11 +- .../elasticsearch/ingest/processors_test.go | 105 +++++++++++++ .../runners/pipeline/coverage_test.go | 141 ++++++++++++++++++ 3 files changed, 255 insertions(+), 2 deletions(-) create mode 100644 internal/testrunner/runners/pipeline/coverage_test.go diff --git a/internal/elasticsearch/ingest/processors.go b/internal/elasticsearch/ingest/processors.go index 796d5b348..da4b339b3 100644 --- a/internal/elasticsearch/ingest/processors.go +++ b/internal/elasticsearch/ingest/processors.go @@ -6,6 +6,7 @@ package ingest import ( "fmt" + "strings" "gopkg.in/yaml.v3" ) @@ -50,7 +51,7 @@ func (p Pipeline) OriginalProcessors() (procs []Processor, err error) { return procs, nil } -// extract a list of processors from a pipeline definition in YAML format. +// Extract a list of processors from a pipeline definition in YAML format. func processorsFromYAML(content []byte) (procs []Processor, err error) { var p struct { Processors []yaml.Node @@ -76,7 +77,7 @@ func processorsFromYAML(content []byte) (procs []Processor, err error) { return procs, nil } -// returns the last (greater) line number used by a yaml.Node. +// Return the last (greater) line number used by a yaml.Node. func lastLine(node *yaml.Node) int { if node == nil { return 0 @@ -86,6 +87,12 @@ func lastLine(node *yaml.Node) int { if line := lastLine(inner); line > last { last = line } + // For scalar node with multiline content, calculate the last line based on line breaks + if inner.Kind == yaml.ScalarNode && strings.Contains(inner.Value, "\n") { + lineCount := strings.Count(inner.Value, "\n") + last += lineCount + } } + return last } diff --git a/internal/elasticsearch/ingest/processors_test.go b/internal/elasticsearch/ingest/processors_test.go index 0d3d161d3..d7be13ceb 100644 --- a/internal/elasticsearch/ingest/processors_test.go +++ b/internal/elasticsearch/ingest/processors_test.go @@ -147,6 +147,42 @@ on_failure: content: []byte(`{"processors": {"drop": {}},"`), wantErr: true, }, + { + name: "Json single line processor", + format: "json", + content: []byte(`{ + "description": "Pipeline for parsing silly logs.", + "processors": [{"drop": {"if":"ctx.drop!=null"}}] + }`), + expected: []Processor{ + {Type: "drop", FirstLine: 3, LastLine: 3}, + }, + }, + { + name: "Json multiline processor", + format: "json", + content: []byte(`{ + "processors": [ + { + "script": { + "description": "Extract fields", + "lang": "painless", + "source": "String[] envSplit = ctx['env'].splitOnToken(params['delimiter']);\nArrayList tags = new ArrayList();\ntags.add(envSplit[params['position']].trim());\nctx['tags'] = tags;" + } + } + ] +}`), + expected: []Processor{ + {Type: "script", FirstLine: 3, LastLine: 10}, + // Source will be processed as multiline: + // "source": """ + // String[] envSplit = ctx['env'].splitOnToken(params['delimiter']); + // ArrayList tags = new ArrayList(); + // tags.add(envSplit[params['position']].trim()); + // ctx['tags'] = tags; + // """, + }, + }, { name: "Malformed Yaml pipeline", format: "yml", @@ -162,6 +198,75 @@ processors: content: []byte(`foo123"`), wantErr: true, }, + { + name: "Yaml single line processor", + format: "yml", + content: []byte(`--- +processors: + - set: { field: "event.category", value: "web" }`), + expected: []Processor{ + {Type: "set", FirstLine: 3, LastLine: 3}, + }, + }, + { + name: "Yaml multiline processor", + format: "yml", + content: []byte(`--- +processors: + - script: + source: | + def a = 1; + def b = 2; +`), + expected: []Processor{ + {Type: "script", FirstLine: 3, LastLine: 6}, + }, + }, + { + name: "Yaml empty processor", + format: "yml", + content: []byte(`--- +processors: + - set:`), + expected: []Processor{ + {Type: "set", FirstLine: 3, LastLine: 3}, + }, + }, + { + name: "Yaml processors with comments", + format: "yml", + content: []byte(`--- +processors: + # First processor + - set: + field: "event.category" + value: "web" + + # Second processor + - script: + source: | + def a = 1; + def b = 2; +`), + expected: []Processor{ + {Type: "set", FirstLine: 4, LastLine: 6}, + {Type: "script", FirstLine: 9, LastLine: 12}, + }, + }, + { + name: "Yaml nested processor", + format: "yml", + content: []byte(`--- +processors: + - if: + condition: "ctx.event.category == 'web'" + processors: + - set: { field: "event.type", value: "start" } +`), + expected: []Processor{ + {Type: "if", FirstLine: 3, LastLine: 6}, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/internal/testrunner/runners/pipeline/coverage_test.go b/internal/testrunner/runners/pipeline/coverage_test.go new file mode 100644 index 000000000..c6b5bf936 --- /dev/null +++ b/internal/testrunner/runners/pipeline/coverage_test.go @@ -0,0 +1,141 @@ +package pipeline + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-package/internal/elasticsearch/ingest" + "github.com/elastic/elastic-package/internal/testrunner" +) + +func TestGenericCoverageForSinglePipeline(t *testing.T) { + for _, testcase := range []struct { + title string + pipelineRelPath string + src []ingest.Processor + pstats ingest.PipelineStats + expectedLinesCovered int64 + expectedFile *testrunner.GenericFile + }{ + { + title: "Single Processor - covered", + pipelineRelPath: "", + src: []ingest.Processor{ + {Type: "append", FirstLine: 1, LastLine: 1}, + }, + pstats: ingest.PipelineStats{ + StatsRecord: ingest.StatsRecord{ + Count: 9, + Current: 10, + Failed: 11, + TimeInMillis: 12, + }, + Processors: []ingest.ProcessorStats{ + { + Type: "append", + Conditional: true, + Stats: ingest.StatsRecord{ + Count: 13, + Current: 14, + Failed: 15, + TimeInMillis: 16, + }, + }, + }, + }, + expectedLinesCovered: 1, + expectedFile: &testrunner.GenericFile{ + Path: "", + Lines: []*testrunner.GenericLine{ + {LineNumber: 1, Covered: true}, + }, + }, + }, + { + title: "Single Processor - not covered", + pipelineRelPath: "", + src: []ingest.Processor{ + {Type: "append", FirstLine: 1, LastLine: 1}, + }, + pstats: ingest.PipelineStats{ + StatsRecord: ingest.StatsRecord{ + Count: 9, + Current: 10, + Failed: 11, + TimeInMillis: 12, + }, + Processors: []ingest.ProcessorStats{ + { + Type: "append", + Conditional: true, + Stats: ingest.StatsRecord{ + Count: 0, + Current: 14, + Failed: 15, + TimeInMillis: 16, + }, + }, + }, + }, + expectedLinesCovered: 0, + expectedFile: &testrunner.GenericFile{ + Path: "", + Lines: []*testrunner.GenericLine{ + {LineNumber: 1, Covered: false}, + }, + }, + }, + { + title: "Multi Processor - covered", + pipelineRelPath: "", + src: []ingest.Processor{ + {Type: "append", FirstLine: 1, LastLine: 1}, + {Type: "geoip", FirstLine: 2, LastLine: 2}, + }, + pstats: ingest.PipelineStats{ + StatsRecord: ingest.StatsRecord{ + Count: 9, + Current: 10, + Failed: 11, + TimeInMillis: 12, + }, + Processors: []ingest.ProcessorStats{ + { + Type: "append", + Conditional: true, + Stats: ingest.StatsRecord{ + Count: 13, + Current: 14, + Failed: 15, + TimeInMillis: 16, + }, + }, + { + Type: "geoip", + Stats: ingest.StatsRecord{ + Count: 17, + Current: 18, + Failed: 19, + TimeInMillis: 20, + }, + }, + }, + }, + expectedLinesCovered: 2, + expectedFile: &testrunner.GenericFile{ + Path: "", + Lines: []*testrunner.GenericLine{ + {LineNumber: 1, Covered: true}, + {LineNumber: 2, Covered: true}, + }, + }, + }, + } { + t.Run(testcase.title, func(t *testing.T) { + linesCoveredResult, fileResult, _ := genericCoverageForSinglePipeline(testcase.pipelineRelPath, testcase.src, testcase.pstats) + assert.Equal(t, testcase.expectedLinesCovered, linesCoveredResult) + assert.Equal(t, testcase.expectedFile, fileResult) + }) + } +} From 5b437cfc117a31e5a4e4dfa29d8c9d4e060b467e Mon Sep 17 00:00:00 2001 From: Hanna Tamoudi Date: Mon, 2 Sep 2024 12:35:12 +0200 Subject: [PATCH 2/8] add copyright --- internal/testrunner/runners/pipeline/coverage_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/testrunner/runners/pipeline/coverage_test.go b/internal/testrunner/runners/pipeline/coverage_test.go index c6b5bf936..d78b5039e 100644 --- a/internal/testrunner/runners/pipeline/coverage_test.go +++ b/internal/testrunner/runners/pipeline/coverage_test.go @@ -1,3 +1,7 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + package pipeline import ( From d159d3050bb5a4820e1711eed18d60292a71cfd4 Mon Sep 17 00:00:00 2001 From: Hanna Tamoudi Date: Mon, 2 Sep 2024 17:54:19 +0200 Subject: [PATCH 3/8] add test cobertura for single pipeline --- .../runners/pipeline/coverage_test.go | 123 ++++++++++++++++++ 1 file changed, 123 insertions(+) diff --git a/internal/testrunner/runners/pipeline/coverage_test.go b/internal/testrunner/runners/pipeline/coverage_test.go index d78b5039e..45f72a0dd 100644 --- a/internal/testrunner/runners/pipeline/coverage_test.go +++ b/internal/testrunner/runners/pipeline/coverage_test.go @@ -143,3 +143,126 @@ func TestGenericCoverageForSinglePipeline(t *testing.T) { }) } } + +func TestCoberturaForSinglePipeline(t *testing.T) { + const firstProcessorName = "append" + const firstProcessorFirstLine = 1 + const firstProcessorHitCount = 13 + + const secondProcessorName = "geoip" + const secondProcessorFirstLine = 2 + const secondProcessorHitCount = 17 + + const pipelineName = "Pipeline name" + + for _, testcase := range []struct { + title string + pipelineName string + pipelineRelPath string + src []ingest.Processor + pstats ingest.PipelineStats + expectedLinesCovered int64 + expectedClass *testrunner.CoberturaClass + }{ + { + title: "Single Processor - covered", + pipelineName: pipelineName, + pipelineRelPath: "", + src: []ingest.Processor{ + {Type: firstProcessorName, FirstLine: firstProcessorFirstLine, LastLine: firstProcessorFirstLine}, + }, + pstats: ingest.PipelineStats{ + StatsRecord: ingest.StatsRecord{ + Count: 9, + Current: 10, + Failed: 11, + TimeInMillis: 12, + }, + Processors: []ingest.ProcessorStats{ + { + Type: firstProcessorName, + Conditional: true, + Stats: ingest.StatsRecord{ + Count: firstProcessorHitCount, + Current: 14, + Failed: 15, + TimeInMillis: 16, + }, + }, + }, + }, + expectedLinesCovered: 1, + expectedClass: &testrunner.CoberturaClass{ + Name: pipelineName, + Lines: []*testrunner.CoberturaLine{ + {Number: firstProcessorFirstLine, Hits: firstProcessorHitCount}, + }, + Methods: []*testrunner.CoberturaMethod{ + {Name: firstProcessorName, Lines: []*testrunner.CoberturaLine{ + {Number: firstProcessorFirstLine, Hits: firstProcessorHitCount}, + }}, + }, + }, + }, + { + title: "Multi Processor - covered", + pipelineRelPath: "", + src: []ingest.Processor{ + {Type: firstProcessorName, FirstLine: firstProcessorFirstLine, LastLine: firstProcessorFirstLine}, + {Type: secondProcessorName, FirstLine: secondProcessorFirstLine, LastLine: secondProcessorFirstLine}, + }, + pstats: ingest.PipelineStats{ + StatsRecord: ingest.StatsRecord{ + Count: 9, + Current: 10, + Failed: 11, + TimeInMillis: 12, + }, + Processors: []ingest.ProcessorStats{ + { + Type: firstProcessorName, + Conditional: true, + Stats: ingest.StatsRecord{ + Count: firstProcessorHitCount, + Current: 14, + Failed: 15, + TimeInMillis: 16, + }, + }, + { + Type: secondProcessorName, + Stats: ingest.StatsRecord{ + Count: secondProcessorHitCount, + Current: 18, + Failed: 19, + TimeInMillis: 20, + }, + }, + }, + }, + expectedLinesCovered: 2, + expectedClass: &testrunner.CoberturaClass{ + Name: pipelineName, + Lines: []*testrunner.CoberturaLine{ + {Number: firstProcessorFirstLine, Hits: firstProcessorHitCount}, + {Number: secondProcessorFirstLine, Hits: secondProcessorHitCount}, + }, + Methods: []*testrunner.CoberturaMethod{ + {Name: firstProcessorName, Lines: []*testrunner.CoberturaLine{ + {Number: firstProcessorFirstLine, Hits: firstProcessorHitCount}, + }}, + + {Name: secondProcessorFirstLine, Lines: []*testrunner.CoberturaLine{ + {Number: secondProcessorName, Hits: secondProcessorHitCount}, + }}, + }, + }, + }, + } { + t.Run(testcase.title, func(t *testing.T) { + linesCoveredResult, classResult, _ := coberturaForSinglePipeline(testcase.pipelineName, testcase.pipelineRelPath, testcase.src, testcase.pstats) + assert.Equal(t, testcase.expectedLinesCovered, linesCoveredResult) + assert.Equal(t, testcase.expectedClass, classResult) + }) + } +} From 06d6ac1016cf31174b73cf87290eb09a54f51edb Mon Sep 17 00:00:00 2001 From: Hanna Tamoudi Date: Mon, 2 Sep 2024 18:20:19 +0200 Subject: [PATCH 4/8] fix typo in test --- internal/testrunner/runners/pipeline/coverage_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/testrunner/runners/pipeline/coverage_test.go b/internal/testrunner/runners/pipeline/coverage_test.go index 45f72a0dd..146319ff8 100644 --- a/internal/testrunner/runners/pipeline/coverage_test.go +++ b/internal/testrunner/runners/pipeline/coverage_test.go @@ -206,6 +206,7 @@ func TestCoberturaForSinglePipeline(t *testing.T) { }, { title: "Multi Processor - covered", + pipelineName: pipelineName, pipelineRelPath: "", src: []ingest.Processor{ {Type: firstProcessorName, FirstLine: firstProcessorFirstLine, LastLine: firstProcessorFirstLine}, @@ -251,9 +252,8 @@ func TestCoberturaForSinglePipeline(t *testing.T) { {Name: firstProcessorName, Lines: []*testrunner.CoberturaLine{ {Number: firstProcessorFirstLine, Hits: firstProcessorHitCount}, }}, - - {Name: secondProcessorFirstLine, Lines: []*testrunner.CoberturaLine{ - {Number: secondProcessorName, Hits: secondProcessorHitCount}, + {Name: secondProcessorName, Lines: []*testrunner.CoberturaLine{ + {Number: secondProcessorFirstLine, Hits: secondProcessorHitCount}, }}, }, }, From 09a10407b949a014a65dbd036686ac9743edb4e1 Mon Sep 17 00:00:00 2001 From: Hanna Tamoudi Date: Tue, 3 Sep 2024 10:21:40 +0200 Subject: [PATCH 5/8] Update internal/elasticsearch/ingest/processors.go Co-authored-by: Jaime Soriano Pastor --- internal/elasticsearch/ingest/processors.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/elasticsearch/ingest/processors.go b/internal/elasticsearch/ingest/processors.go index da4b339b3..3b815b2dc 100644 --- a/internal/elasticsearch/ingest/processors.go +++ b/internal/elasticsearch/ingest/processors.go @@ -77,7 +77,7 @@ func processorsFromYAML(content []byte) (procs []Processor, err error) { return procs, nil } -// Return the last (greater) line number used by a yaml.Node. +// lastLine returns the last (greater) line number used by a yaml.Node. func lastLine(node *yaml.Node) int { if node == nil { return 0 From 6d8c1a8b495baf3b119f6a69c0e5b813f06aa236 Mon Sep 17 00:00:00 2001 From: Hanna Tamoudi Date: Tue, 3 Sep 2024 10:21:47 +0200 Subject: [PATCH 6/8] Update internal/elasticsearch/ingest/processors.go Co-authored-by: Jaime Soriano Pastor --- internal/elasticsearch/ingest/processors.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/elasticsearch/ingest/processors.go b/internal/elasticsearch/ingest/processors.go index 3b815b2dc..92f11b240 100644 --- a/internal/elasticsearch/ingest/processors.go +++ b/internal/elasticsearch/ingest/processors.go @@ -51,7 +51,7 @@ func (p Pipeline) OriginalProcessors() (procs []Processor, err error) { return procs, nil } -// Extract a list of processors from a pipeline definition in YAML format. +// processorsFromYAML extracts a list of processors from a pipeline definition in YAML format. func processorsFromYAML(content []byte) (procs []Processor, err error) { var p struct { Processors []yaml.Node From 9d2902f391c2c03ae6b95e3d0e7a6652148c08dc Mon Sep 17 00:00:00 2001 From: Hanna Tamoudi Date: Thu, 5 Sep 2024 14:02:49 +0200 Subject: [PATCH 7/8] processor ends when next one start --- internal/elasticsearch/ingest/processors.go | 92 ++++++++-- .../elasticsearch/ingest/processors_test.go | 167 ++++++++++++------ 2 files changed, 190 insertions(+), 69 deletions(-) diff --git a/internal/elasticsearch/ingest/processors.go b/internal/elasticsearch/ingest/processors.go index 92f11b240..e692eef68 100644 --- a/internal/elasticsearch/ingest/processors.go +++ b/internal/elasticsearch/ingest/processors.go @@ -5,8 +5,9 @@ package ingest import ( + "bufio" + "bytes" "fmt" - "strings" "gopkg.in/yaml.v3" ) @@ -59,6 +60,7 @@ func processorsFromYAML(content []byte) (procs []Processor, err error) { if err = yaml.Unmarshal(content, &p); err != nil { return nil, err } + for idx, entry := range p.Processors { if entry.Kind != yaml.MappingNode || len(entry.Content) != 2 { return nil, fmt.Errorf("processor#%d is not a single-key map (kind:%v content:%d)", idx, entry.Kind, len(entry.Content)) @@ -71,28 +73,88 @@ func processorsFromYAML(content []byte) (procs []Processor, err error) { return nil, fmt.Errorf("error decoding processor#%d type: %w", idx, err) } proc.FirstLine = entry.Line - proc.LastLine = lastLine(&entry) + lastLine, err := getProcessorLastLine(idx, p.Processors, proc, content) + if err != nil { + return nil, err + } + proc.LastLine = lastLine + procs = append(procs, proc) } - return procs, nil + return procs, err +} + +// getProcessorLastLine determines the last line number for the given processor. +func getProcessorLastLine(idx int, processors []yaml.Node, currentProcessor Processor, content []byte) (int, error) { + if idx < len(processors)-1 { + var endProcessor = processors[idx+1].Line - 1 + if endProcessor < currentProcessor.FirstLine { + return currentProcessor.FirstLine, nil + } else { + return processors[idx+1].Line - 1, nil + } + } + + return nextProcessorOrEndOfPipeline(content) } -// lastLine returns the last (greater) line number used by a yaml.Node. -func lastLine(node *yaml.Node) int { +// lastProcessorLine get the line before the node after the processors node. If there is none, it returns the end of file line +func nextProcessorOrEndOfPipeline(content []byte) (int, error) { + var root yaml.Node + if err := yaml.Unmarshal(content, &root); err != nil { + return 0, fmt.Errorf("error unmarshaling YAML: %v", err) + } + + var nodes []*yaml.Node + extractNodesFromMapping(&root, &nodes) + for i, node := range nodes { + + if node.Value == "processors" { + if i < len(nodes)-1 { + + return nodes[i+1].Line - 1, nil + } + } + + } + return countLinesInBytes(content) +} + +// extractNodesFromMapping recursively extracts all nodes from MappingNodes within DocumentNodes. +func extractNodesFromMapping(node *yaml.Node, nodes *[]*yaml.Node) { if node == nil { - return 0 + return } - last := node.Line - for _, inner := range node.Content { - if line := lastLine(inner); line > last { - last = line + + if node.Kind == yaml.DocumentNode { + for _, child := range node.Content { + extractNodesFromMapping(child, nodes) } - // For scalar node with multiline content, calculate the last line based on line breaks - if inner.Kind == yaml.ScalarNode && strings.Contains(inner.Value, "\n") { - lineCount := strings.Count(inner.Value, "\n") - last += lineCount + return + } + + if node.Kind == yaml.MappingNode { + for _, child := range node.Content { + if child.Kind == yaml.MappingNode || child.Kind == yaml.ScalarNode { + *nodes = append(*nodes, child) + } + extractNodesFromMapping(child, nodes) } } +} + +// countLinesInBytes counts the number of lines in the given byte slice. +func countLinesInBytes(data []byte) (int, error) { + scanner := bufio.NewScanner(bytes.NewReader(data)) + lineCount := 0 + + for scanner.Scan() { + lineCount++ + } + + if err := scanner.Err(); err != nil { + return 0, fmt.Errorf("error reading data: %w", err) + } - return last + return lineCount, nil } diff --git a/internal/elasticsearch/ingest/processors_test.go b/internal/elasticsearch/ingest/processors_test.go index d7be13ceb..4ab43ef5c 100644 --- a/internal/elasticsearch/ingest/processors_test.go +++ b/internal/elasticsearch/ingest/processors_test.go @@ -24,61 +24,104 @@ func TestResource_Processors(t *testing.T) { content: []byte(`--- description: Made up pipeline processors: -# First processor. -- grok: - tag: Extract header - field: message - patterns: - - \[%{APACHE_TIME:apache.error.timestamp}\] \[%{LOGLEVEL:log.level}\]( \[client - %{IPORHOST:source.address}(:%{POSINT:source.port})?\])? %{GREEDYDATA:message} - - \[%{APACHE_TIME:apache.error.timestamp}\] \[%{DATA:apache.error.module}:%{LOGLEVEL:log.level}\] - \[pid %{NUMBER:process.pid:long}(:tid %{NUMBER:process.thread.id:long})?\]( - \[client %{IPORHOST:source.address}(:%{POSINT:source.port})?\])? %{GREEDYDATA:message} - pattern_definitions: - APACHE_TIME: '%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}' - ignore_missing: true + - grok: + tag: Extract header + field: message + patterns: + - \[%{APACHE_TIME:apache.error.timestamp}\] \[%{LOGLEVEL:log.level}\]( \[client%{IPORHOST:source.address}(:%{POSINT:source.port})?\])? %{GREEDYDATA:message} + - \[%{APACHE_TIME:apache.error.timestamp}\] \[%{DATA:apache.error.module}:%{LOGLEVEL:log.level}\] + ignore_missing: true -- date: - field: apache.error.timestamp - target_field: '@timestamp' - formats: - - EEE MMM dd H:m:s yyyy - - EEE MMM dd H:m:s.SSSSSS yyyy - on_failure: - - append: + - date: + field: apache.error.timestamp + target_field: '@timestamp' + formats: + - EEE MMM dd H:m:s yyyy + - EEE MMM dd H:m:s.SSSSSS yyyy + on_failure: + - append: + field: error.message + value: '{{ _ingest.on_failure_message }}' + - set: + description: Set event category + field: event.category + value: web + # Some script + - script: + lang: painless + + - grok: + field: source.address + ignore_missing: true + patterns: + - ^(%{IP:source.ip}|%{HOSTNAME:source.domain})$ + - rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true +on_failure: + - set: field: error.message value: '{{ _ingest.on_failure_message }}' -- set: - description: Set event category - field: event.category - value: web -# Some script -- script: - lang: painless - source: >- - [...] +`), + expected: []Processor{ + {Type: "grok", FirstLine: 4, LastLine: 11}, + {Type: "date", FirstLine: 12, LastLine: 21}, + {Type: "set", FirstLine: 22, LastLine: 26}, + {Type: "script", FirstLine: 27, LastLine: 29}, + {Type: "grok", FirstLine: 30, LastLine: 34}, + {Type: "rename", FirstLine: 35, LastLine: 38}, + }, + }, + { + name: "Yaml pipeline", + format: "yml", + content: []byte(`--- +description: Made up pipeline +processors: + - grok: + tag: Extract header + field: message + patterns: + - \[%{APACHE_TIME:apache.error.timestamp}\] \[%{LOGLEVEL:log.level}\]( \[client%{IPORHOST:source.address}(:%{POSINT:source.port})?\])? %{GREEDYDATA:message} + - \[%{APACHE_TIME:apache.error.timestamp}\] \[%{DATA:apache.error.module}:%{LOGLEVEL:log.level}\] + ignore_missing: true -- grok: - field: source.address - ignore_missing: true - patterns: - - ^(%{IP:source.ip}|%{HOSTNAME:source.domain})$ -- rename: - field: source.as.organization_name - target_field: source.as.organization.name - ignore_missing: true -on_failure: -- set: - field: error.message - value: '{{ _ingest.on_failure_message }}' + - date: + field: apache.error.timestamp + target_field: '@timestamp' + formats: + - EEE MMM dd H:m:s yyyy + - EEE MMM dd H:m:s.SSSSSS yyyy + on_failure: + - append: + field: error.message + value: '{{ _ingest.on_failure_message }}' + - set: + description: Set event category + field: event.category + value: web + # Some script + - script: + lang: painless + + - grok: + field: source.address + ignore_missing: true + patterns: + - ^(%{IP:source.ip}|%{HOSTNAME:source.domain})$ + - rename: + field: source.as.organization_name + target_field: source.as.organization.name + ignore_missing: true `), expected: []Processor{ - {Type: "grok", FirstLine: 5, LastLine: 16}, - {Type: "date", FirstLine: 18, LastLine: 27}, - {Type: "set", FirstLine: 28, LastLine: 31}, - {Type: "script", FirstLine: 33, LastLine: 35}, - {Type: "grok", FirstLine: 38, LastLine: 42}, - {Type: "rename", FirstLine: 43, LastLine: 46}, + {Type: "grok", FirstLine: 4, LastLine: 11}, + {Type: "date", FirstLine: 12, LastLine: 21}, + {Type: "set", FirstLine: 22, LastLine: 26}, + {Type: "script", FirstLine: 27, LastLine: 29}, + {Type: "grok", FirstLine: 30, LastLine: 34}, + {Type: "rename", FirstLine: 35, LastLine: 38}, }, }, { @@ -109,10 +152,10 @@ on_failure: `), expected: []Processor{ {Type: "drop", FirstLine: 3, LastLine: 3}, - {Type: "set", FirstLine: 4, LastLine: 7}, + {Type: "set", FirstLine: 4, LastLine: 8}, {Type: "remove", FirstLine: 9, LastLine: 9}, {Type: "set", FirstLine: 9, LastLine: 9}, - {Type: "set", FirstLine: 10, LastLine: 13}, + {Type: "set", FirstLine: 10, LastLine: 15}, }, }, { @@ -155,7 +198,7 @@ on_failure: "processors": [{"drop": {"if":"ctx.drop!=null"}}] }`), expected: []Processor{ - {Type: "drop", FirstLine: 3, LastLine: 3}, + {Type: "drop", FirstLine: 3, LastLine: 4}, }, }, { @@ -173,7 +216,7 @@ on_failure: ] }`), expected: []Processor{ - {Type: "script", FirstLine: 3, LastLine: 10}, + {Type: "script", FirstLine: 3, LastLine: 11}, // Source will be processed as multiline: // "source": """ // String[] envSplit = ctx['env'].splitOnToken(params['delimiter']); @@ -222,6 +265,22 @@ processors: {Type: "script", FirstLine: 3, LastLine: 6}, }, }, + { + name: "Yaml script with empty line characters", + format: "yml", + content: []byte(`--- +processors: + - script: + description: Do something. + tag: script_drop_null_empty_values + lang: painless + source: "def a = b \n + ; def b = 2; \n" +`), + expected: []Processor{ + {Type: "script", FirstLine: 3, LastLine: 8}, + }, + }, { name: "Yaml empty processor", format: "yml", @@ -249,7 +308,7 @@ processors: def b = 2; `), expected: []Processor{ - {Type: "set", FirstLine: 4, LastLine: 6}, + {Type: "set", FirstLine: 4, LastLine: 8}, {Type: "script", FirstLine: 9, LastLine: 12}, }, }, From 2c439bb479c32b1f283c3970fb8cec90aac2d441 Mon Sep 17 00:00:00 2001 From: Hanna Tamoudi Date: Mon, 16 Sep 2024 11:03:23 +0200 Subject: [PATCH 8/8] Update internal/elasticsearch/ingest/processors.go Co-authored-by: Mario Rodriguez Molins --- internal/elasticsearch/ingest/processors.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/elasticsearch/ingest/processors.go b/internal/elasticsearch/ingest/processors.go index e692eef68..40805d902 100644 --- a/internal/elasticsearch/ingest/processors.go +++ b/internal/elasticsearch/ingest/processors.go @@ -98,7 +98,7 @@ func getProcessorLastLine(idx int, processors []yaml.Node, currentProcessor Proc return nextProcessorOrEndOfPipeline(content) } -// lastProcessorLine get the line before the node after the processors node. If there is none, it returns the end of file line +// nextProcessorOrEndOfPipeline get the line before the node after the processors node. If there is none, it returns the end of file line func nextProcessorOrEndOfPipeline(content []byte) (int, error) { var root yaml.Node if err := yaml.Unmarshal(content, &root); err != nil {