Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle multiline processors #2063

Merged
merged 10 commits into from
Sep 20, 2024
11 changes: 9 additions & 2 deletions internal/elasticsearch/ingest/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package ingest

import (
"fmt"
"strings"

"gopkg.in/yaml.v3"
)
Expand Down Expand Up @@ -50,7 +51,7 @@ func (p Pipeline) OriginalProcessors() (procs []Processor, err error) {
return procs, nil
}

// extract a list of processors from a pipeline definition in YAML format.
// Extract a list of processors from a pipeline definition in YAML format.
haetamoudi marked this conversation as resolved.
Show resolved Hide resolved
func processorsFromYAML(content []byte) (procs []Processor, err error) {
var p struct {
Processors []yaml.Node
Expand All @@ -76,7 +77,7 @@ func processorsFromYAML(content []byte) (procs []Processor, err error) {
return procs, nil
}

// returns the last (greater) line number used by a yaml.Node.
// Return the last (greater) line number used by a yaml.Node.
haetamoudi marked this conversation as resolved.
Show resolved Hide resolved
func lastLine(node *yaml.Node) int {
if node == nil {
return 0
Expand All @@ -86,6 +87,12 @@ func lastLine(node *yaml.Node) int {
if line := lastLine(inner); line > last {
last = line
}
// For scalar node with multiline content, calculate the last line based on line breaks
if inner.Kind == yaml.ScalarNode && strings.Contains(inner.Value, "\n") {
lineCount := strings.Count(inner.Value, "\n")
last += lineCount
}
}

return last
}
105 changes: 105 additions & 0 deletions internal/elasticsearch/ingest/processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,42 @@ on_failure:
content: []byte(`{"processors": {"drop": {}},"`),
wantErr: true,
},
{
name: "Json single line processor",
format: "json",
content: []byte(`{
"description": "Pipeline for parsing silly logs.",
"processors": [{"drop": {"if":"ctx.drop!=null"}}]
}`),
expected: []Processor{
{Type: "drop", FirstLine: 3, LastLine: 3},
},
},
{
name: "Json multiline processor",
format: "json",
content: []byte(`{
"processors": [
{
"script": {
"description": "Extract fields",
"lang": "painless",
"source": "String[] envSplit = ctx['env'].splitOnToken(params['delimiter']);\nArrayList tags = new ArrayList();\ntags.add(envSplit[params['position']].trim());\nctx['tags'] = tags;"
}
}
]
}`),
expected: []Processor{
{Type: "script", FirstLine: 3, LastLine: 10},
// Source will be processed as multiline:
// "source": """
// String[] envSplit = ctx['env'].splitOnToken(params['delimiter']);
// ArrayList tags = new ArrayList();
// tags.add(envSplit[params['position']].trim());
// ctx['tags'] = tags;
// """,
},
},
{
name: "Malformed Yaml pipeline",
format: "yml",
Expand All @@ -162,6 +198,75 @@ processors:
content: []byte(`foo123"`),
wantErr: true,
},
{
name: "Yaml single line processor",
format: "yml",
content: []byte(`---
processors:
- set: { field: "event.category", value: "web" }`),
expected: []Processor{
{Type: "set", FirstLine: 3, LastLine: 3},
},
},
{
name: "Yaml multiline processor",
format: "yml",
content: []byte(`---
processors:
- script:
source: |
def a = 1;
def b = 2;
`),
expected: []Processor{
{Type: "script", FirstLine: 3, LastLine: 6},
},
},
{
name: "Yaml empty processor",
format: "yml",
content: []byte(`---
processors:
- set:`),
expected: []Processor{
{Type: "set", FirstLine: 3, LastLine: 3},
},
},
{
name: "Yaml processors with comments",
format: "yml",
content: []byte(`---
processors:
# First processor
- set:
field: "event.category"
value: "web"

# Second processor
- script:
source: |
def a = 1;
def b = 2;
`),
expected: []Processor{
{Type: "set", FirstLine: 4, LastLine: 6},
{Type: "script", FirstLine: 9, LastLine: 12},
},
},
{
name: "Yaml nested processor",
format: "yml",
content: []byte(`---
processors:
- if:
condition: "ctx.event.category == 'web'"
processors:
- set: { field: "event.type", value: "start" }
`),
expected: []Processor{
{Type: "if", FirstLine: 3, LastLine: 6},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
Loading