Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic pipelines - a new foreach block #1480

Merged
merged 33 commits into from
Feb 12, 2025
Merged
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
94509e7
Proposal for dynamic pipelines
ptodev Aug 15, 2024
1f2ce8d
Foreach prototype
ptodev Oct 31, 2024
247b68d
Initial implementation
ptodev Nov 20, 2024
afdeb23
wip
wildum Nov 20, 2024
b917da4
Fixes to summation1 and summation2
ptodev Dec 17, 2024
439b300
fix foreach run
wildum Dec 17, 2024
7de0aa1
foreach uses the value from the collection via the var
wildum Dec 18, 2024
5174a33
compute an ID for the foreach instances and add tests
wildum Dec 19, 2024
2290e44
rework foreach txtar tests
wildum Jan 7, 2025
534e07e
support using modules inside of foreach
wildum Jan 8, 2025
68b7c7c
cleanup
wildum Jan 9, 2025
e192510
update frontend to use the moduleID of the component instead of the m…
wildum Jan 9, 2025
56437f7
plug the foreach node to the UI
wildum Jan 9, 2025
2625f1d
fix internal template components link
wildum Jan 9, 2025
91778c6
update comment in component references
wildum Jan 10, 2025
0d95adc
cleanups
wildum Jan 13, 2025
0b9f400
Disable debug metrics for components inside foreach, and for foreach …
ptodev Jan 15, 2025
6148600
Add stability lvl to config blocks (#2441)
wildum Jan 17, 2025
b91aee3
Add tests for types other than integers (#2436)
ptodev Jan 23, 2025
b690b16
Add docs for foreach (#2447)
ptodev Jan 23, 2025
6bf628c
use full hash on foreach instances and fix test
wildum Jan 27, 2025
417bd4d
Add a changelog entry.
ptodev Jan 27, 2025
86435e4
typo
wildum Jan 31, 2025
0b42dc3
allow non alphanum strings
wildum Jan 31, 2025
8611fa3
add test for wrong collection type
wildum Jan 31, 2025
7db7859
added capsule test
wildum Jan 31, 2025
1ab03b9
Add more tests for non-alphanumeric strings.
ptodev Jan 31, 2025
00c8dca
Apply suggestions from code review
ptodev Feb 10, 2025
737cd0d
Apply suggestions from code review
ptodev Feb 10, 2025
9781fab
Add comments regarding the override registry for modules
ptodev Feb 10, 2025
5ec5380
Rename hashObject to objectFingerprint
ptodev Feb 11, 2025
8f2f642
add comment for the hash function
wildum Feb 12, 2025
43c4d74
add additional detail to the comment
wildum Feb 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add tests for types other than integers (#2436)
* Add tests for types other than integers

* Minor fixes to string_receiver

* Add a foreach test for maps which contain maps
ptodev committed Jan 27, 2025
commit b91aee38dd384eae94a3f082951ed95b732218fe
55 changes: 55 additions & 0 deletions internal/runtime/foreach_stringer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package runtime_test

import (
"context"
"os"
"path/filepath"
"sync"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestForeachStringer(t *testing.T) {
directory := "./testdata/foreach_stringer"
for _, file := range getTestFiles(directory, t) {
tc := buildTestForEach(t, filepath.Join(directory, file.Name()))
t.Run(file.Name(), func(t *testing.T) {
if tc.module != "" {
defer os.Remove("module.alloy")
require.NoError(t, os.WriteFile("module.alloy", []byte(tc.module), 0664))
}
testConfigForEachStringer(t, tc.main, *tc.expectedDebugInfo)
})
}
}

func testConfigForEachStringer(t *testing.T, config string, expectedDebugInfo string) {
defer verifyNoGoroutineLeaks(t)
reg := prometheus.NewRegistry()
ctrl, f := setup(t, config, reg)

err := ctrl.LoadSource(f, nil, "")
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
defer func() {
cancel()
wg.Wait()
}()

wg.Add(1)
go func() {
defer wg.Done()
ctrl.Run(ctx)
}()

require.EventuallyWithT(t, func(c *assert.CollectT) {
debugInfo := getDebugInfo[string](t, ctrl, "", "testcomponents.string_receiver.log")
require.Equal(t, expectedDebugInfo, debugInfo)
}, 3*time.Second, 10*time.Millisecond)
}
6 changes: 5 additions & 1 deletion internal/runtime/foreach_test.go
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ func TestForeach(t *testing.T) {
directory := "./testdata/foreach"
mattdurham marked this conversation as resolved.
Show resolved Hide resolved
for _, file := range getTestFiles(directory, t) {
tc := buildTestForEach(t, filepath.Join(directory, file.Name()))
t.Run(tc.description, func(t *testing.T) {
t.Run(file.Name(), func(t *testing.T) {
if tc.module != "" {
defer os.Remove("module.alloy")
require.NoError(t, os.WriteFile("module.alloy", []byte(tc.module), 0664))
@@ -68,6 +68,7 @@ type testForEachFile struct {
update *updateFile // update can be used to update the content of a file at runtime
expectedMetrics *string // expected prometheus metrics
expectedDurationMetrics *int // expected prometheus duration metrics - check those separately as they vary with each test run
expectedDebugInfo *string // expected debug info after running the config
}

func buildTestForEach(t *testing.T, filename string) testForEachFile {
@@ -96,6 +97,9 @@ func buildTestForEach(t *testing.T, filename string) testForEachFile {
expectedDurationMetrics, err := strconv.Atoi(strings.TrimSpace(string((alloyConfig.Data))))
require.NoError(t, err)
tc.expectedDurationMetrics = &expectedDurationMetrics
case "expected_debug_info.txt":
expectedDebugInfo := string(alloyConfig.Data)
tc.expectedDebugInfo = &expectedDebugInfo
}
}
return tc
16 changes: 15 additions & 1 deletion internal/runtime/import_test.go
Original file line number Diff line number Diff line change
@@ -393,5 +393,19 @@ func getTestFiles(directory string, t *testing.T) []fs.FileInfo {
files, err := dir.Readdir(-1)
require.NoError(t, err)

return files
// Don't use files which start with a dot (".").
// This is to prevent the test suite from using files such as ".DS_Store",
// which Visual Studio Code may add.
return filterFiles(files, ".")
}

// Only take into account files which don't have a certain prefix.
func filterFiles(files []fs.FileInfo, denylistedPrefix string) []fs.FileInfo {
res := make([]fs.FileInfo, 0, len(files))
for _, file := range files {
if !strings.HasPrefix(file.Name(), denylistedPrefix) {
res = append(res, file)
}
}
return res
}
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (
"fmt"
"hash/fnv"
"path"
"strings"
"sync"
"time"

@@ -381,9 +382,14 @@ func computeHash(s string) string {
}

func hashObject(obj any) string {
ptodev marked this conversation as resolved.
Show resolved Hide resolved
//TODO: Test what happens if there is a "true" string and a true bool in the collection.
switch v := obj.(type) {
case int, string, float64, bool:
case int, string, bool:
return fmt.Sprintf("%v", v)
case float64:
// Dots are not valid characters in Alloy syntax identifiers.
// For example, "foreach_3.14_1" should become "foreach_3_14_1".
return strings.Replace(fmt.Sprintf("%f", v), ".", "_", -1)
mattdurham marked this conversation as resolved.
Show resolved Hide resolved
default:
return computeHash(fmt.Sprintf("%#v", v))
}
91 changes: 91 additions & 0 deletions internal/runtime/internal/testcomponents/string_receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package testcomponents

import (
"context"
"sync"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/featuregate"
)

func init() {
component.Register(component.Registration{
Name: "testcomponents.string_receiver",
Stability: featuregate.StabilityPublicPreview,
Args: StringReceiverConfig{},
Exports: StringReceiverExports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return NewStringReceiverComp(opts, args.(StringReceiverConfig))
},
})
}

type StringReceiverConfig struct {
}

type StringReceiver interface {
Receive(string)
}

type StringReceiverImpl struct {
log func(string)
}

func (r StringReceiverImpl) Receive(s string) {
r.log(s)
}

type StringReceiverExports struct {
Receiver StringReceiver `alloy:"receiver,attr"`
}

type StringReceiverComponent struct {
opts component.Options
log log.Logger

mut sync.Mutex
recvStr string
receiver StringReceiver
}

// NewStringReceiver creates a new string_receiver component.
func NewStringReceiverComp(o component.Options, cfg StringReceiverConfig) (*StringReceiverComponent, error) {
s := &StringReceiverComponent{opts: o, log: o.Logger}
s.receiver = StringReceiverImpl{
log: func(str string) {
s.mut.Lock()
defer s.mut.Unlock()
s.recvStr += str + "\n"
},
}

o.OnStateChange(StringReceiverExports{
Receiver: s.receiver,
})

return s, nil
}

var (
_ component.Component = (*StringReceiverComponent)(nil)
)

// Run implements Component.
func (s *StringReceiverComponent) Run(ctx context.Context) error {
<-ctx.Done()
return nil
}

// Return the receiver as debug info instead of export to avoid evaluation loop.
func (s *StringReceiverComponent) DebugInfo() interface{} {
s.mut.Lock()
defer s.mut.Unlock()
return s.recvStr
}

// Update implements Component.
func (s *StringReceiverComponent) Update(args component.Arguments) error {
return nil
}
95 changes: 95 additions & 0 deletions internal/runtime/internal/testcomponents/stringer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package testcomponents

import (
"context"
"fmt"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/featuregate"
)

// testcomponents.stringer takes in an Alloy value, converts it to a string, and forwards it to the defined receivers.
func init() {
component.Register(component.Registration{
Name: "testcomponents.stringer",
Stability: featuregate.StabilityPublicPreview,
Args: StringerConfig{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return NewStringer(opts, args.(StringerConfig))
},
})
}

type StringerConfig struct {
InputString *string `alloy:"input_string,attr,optional"`
InputInt *int `alloy:"input_int,attr,optional"`
InputFloat *float64 `alloy:"input_float,attr,optional"`
InputBool *bool `alloy:"input_bool,attr,optional"`
InputMap *map[string]any `alloy:"input_map,attr,optional"`
InputArray *[]any `alloy:"input_array,attr,optional"`
ForwardTo []StringReceiver `alloy:"forward_to,attr"`
}

type Stringer struct {
opts component.Options
log log.Logger
cfgUpdate chan StringerConfig
}

func NewStringer(o component.Options, cfg StringerConfig) (*Stringer, error) {
t := &Stringer{
opts: o,
log: o.Logger,
cfgUpdate: make(chan StringerConfig, 10),
}
return t, nil
}

var (
_ component.Component = (*Stringer)(nil)
)

func forward(val any, to []StringReceiver) {
for _, r := range to {
str := fmt.Sprintf("%#v", val)
r.Receive(str)
}
}

func (s *Stringer) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case cfg := <-s.cfgUpdate:
// Send the new values to the receivers
if cfg.InputString != nil {
forward(*cfg.InputString, cfg.ForwardTo)
}
if cfg.InputInt != nil {
forward(*cfg.InputInt, cfg.ForwardTo)
}
if cfg.InputFloat != nil {
forward(*cfg.InputFloat, cfg.ForwardTo)
}
if cfg.InputBool != nil {
forward(*cfg.InputBool, cfg.ForwardTo)
}
if cfg.InputArray != nil {
forward(*cfg.InputArray, cfg.ForwardTo)
}
if cfg.InputMap != nil {
forward(*cfg.InputMap, cfg.ForwardTo)
}
}
}
}

// Update implements Component.
func (s *Stringer) Update(args component.Arguments) error {
cfg := args.(StringerConfig)
s.cfgUpdate <- cfg
return nil
}
20 changes: 20 additions & 0 deletions internal/runtime/testdata/foreach/foreach_10.txtar
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
A collection containing arrays.

-- main.alloy --
foreach "testForeach" {
collection = [[10, 4, 100], [20, 6, 200]]
var = "num"

template {
testcomponents.pulse "pt" {
// Only ingest the 4 and the 6.
max = num[1]
frequency = "10ms"
forward_to = [testcomponents.summation_receiver.sum.receiver]
}
}
}

// Similar to testcomponents.summation, but with a "receiver" export
testcomponents.summation_receiver "sum" {
}
19 changes: 19 additions & 0 deletions internal/runtime/testdata/foreach/foreach_11.txtar
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
A collection containing maps which contain maps.

-- main.alloy --
foreach "testForeach" {
collection = [{"a" = {"c" = 3}}, {"a" = {"c" = 7}}]
var = "num"

template {
testcomponents.pulse "pt" {
max = num["a"]["c"]
frequency = "10ms"
forward_to = [testcomponents.summation_receiver.sum.receiver]
}
}
}

// Similar to testcomponents.summation, but with a "receiver" export
testcomponents.summation_receiver "sum" {
}
19 changes: 19 additions & 0 deletions internal/runtime/testdata/foreach/foreach_9.txtar
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
A collection containing maps.

-- main.alloy --
foreach "testForeach" {
collection = [{"a" = 4}, {"a" = 6}]
var = "num"

template {
testcomponents.pulse "pt" {
max = num["a"]
frequency = "10ms"
forward_to = [testcomponents.summation_receiver.sum.receiver]
}
}
}

// Similar to testcomponents.summation, but with a "receiver" export
testcomponents.summation_receiver "sum" {
}
22 changes: 22 additions & 0 deletions internal/runtime/testdata/foreach_stringer/foreach_1.txtar
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
A collection containing an int.

-- main.alloy --
foreach "testForeach" {
collection = [1]
var = "item"

template {
testcomponents.stringer "st" {
input_int = item
forward_to = [testcomponents.string_receiver.log.receiver]
}
}
}

// Receive strings and append them to a log,
// separated by a new line.
testcomponents.string_receiver "log" {
}

-- expected_debug_info.txt --
1
Loading