Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic pipelines - a new foreach block #1480

Merged
merged 33 commits into from
Feb 12, 2025
Merged
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
94509e7
Proposal for dynamic pipelines
ptodev Aug 15, 2024
1f2ce8d
Foreach prototype
ptodev Oct 31, 2024
247b68d
Initial implementation
ptodev Nov 20, 2024
afdeb23
wip
wildum Nov 20, 2024
b917da4
Fixes to summation1 and summation2
ptodev Dec 17, 2024
439b300
fix foreach run
wildum Dec 17, 2024
7de0aa1
foreach uses the value from the collection via the var
wildum Dec 18, 2024
5174a33
compute an ID for the foreach instances and add tests
wildum Dec 19, 2024
2290e44
rework foreach txtar tests
wildum Jan 7, 2025
534e07e
support using modules inside of foreach
wildum Jan 8, 2025
68b7c7c
cleanup
wildum Jan 9, 2025
e192510
update frontend to use the moduleID of the component instead of the m…
wildum Jan 9, 2025
56437f7
plug the foreach node to the UI
wildum Jan 9, 2025
2625f1d
fix internal template components link
wildum Jan 9, 2025
91778c6
update comment in component references
wildum Jan 10, 2025
0d95adc
cleanups
wildum Jan 13, 2025
0b9f400
Disable debug metrics for components inside foreach, and for foreach …
ptodev Jan 15, 2025
6148600
Add stability lvl to config blocks (#2441)
wildum Jan 17, 2025
b91aee3
Add tests for types other than integers (#2436)
ptodev Jan 23, 2025
b690b16
Add docs for foreach (#2447)
ptodev Jan 23, 2025
6bf628c
use full hash on foreach instances and fix test
wildum Jan 27, 2025
417bd4d
Add a changelog entry.
ptodev Jan 27, 2025
86435e4
typo
wildum Jan 31, 2025
0b42dc3
allow non alphanum strings
wildum Jan 31, 2025
8611fa3
add test for wrong collection type
wildum Jan 31, 2025
7db7859
added capsule test
wildum Jan 31, 2025
1ab03b9
Add more tests for non-alphanumeric strings.
ptodev Jan 31, 2025
00c8dca
Apply suggestions from code review
ptodev Feb 10, 2025
737cd0d
Apply suggestions from code review
ptodev Feb 10, 2025
9781fab
Add comments regarding the override registry for modules
ptodev Feb 10, 2025
5ec5380
Rename hashObject to objectFingerprint
ptodev Feb 11, 2025
8f2f642
add comment for the hash function
wildum Feb 12, 2025
43c4d74
add additional detail to the comment
wildum Feb 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
compute an ID for the foreach instances and add tests
wildum authored and ptodev committed Jan 27, 2025
commit 5174a334841aec074af943fb74784bed89ab1db2
35 changes: 31 additions & 4 deletions internal/runtime/internal/controller/node_config_foreach.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@ package controller

import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"hash/fnv"
"sync"
@@ -16,7 +18,8 @@ type ForeachConfigNode struct {
label string
moduleController ModuleController

customComponents map[string]CustomComponent
customComponents map[string]CustomComponent
customComponentHashCounts map[string]int

forEachChildrenUpdateChan chan struct{} // used to trigger an update of the running children
forEachChildrenRunning bool
@@ -46,6 +49,7 @@ func NewForeachConfigNode(block *ast.BlockStmt, globals ComponentGlobals) *Forea
moduleController: globals.NewModuleController(globalID),
forEachChildrenUpdateChan: make(chan struct{}, 1),
customComponents: make(map[string]CustomComponent, 0),
customComponentHashCounts: make(map[string]int, 0),
}
}

@@ -88,10 +92,17 @@ func (fn *ForeachConfigNode) Evaluate(scope *vm.Scope) error {
// Loop through the items to create the custom components.
// On re-evaluation new components are added and existing ones are updated.
newCustomComponentIds := make(map[string]bool, len(args.Collection))
// find something for the ids because we cannot use numbers
tmp := []string{"aaa", "bbb", "ccc", "ddd"}
fn.customComponentHashCounts = make(map[string]int)
for i := 0; i < len(args.Collection); i++ {
customComponentID := tmp[i]

// We must create an ID from the collection entries to avoid recreating all components on every updates.
// We track the hash counts because the collection might contain duplicates ([1, 1, 1] would result in the same ids
// so we handle it by adding the count at the end -> [11, 12, 13]
customComponentID := fmt.Sprintf("foreach_%s", hashObject(args.Collection[i]))
count := fn.customComponentHashCounts[customComponentID] // count = 0 if the key is not found
fn.customComponentHashCounts[customComponentID] = count + 1
customComponentID += fmt.Sprintf("_%d", count+1)

cc, err := fn.getOrCreateCustomComponent(customComponentID)
if err != nil {
return err
@@ -219,3 +230,19 @@ func (fi *forEachChild) Hash() uint64 {
func (fi *forEachChild) Equals(other runner.Task) bool {
return fi.id == other.(*forEachChild).id
mattdurham marked this conversation as resolved.
Show resolved Hide resolved
}

func computeHash(s string) string {
hasher := sha256.New()
hasher.Write([]byte(s))
fullHash := hasher.Sum(nil)
return hex.EncodeToString(fullHash[:12]) // taking only the 12 first char of the hash should be enough
}

func hashObject(obj any) string {
ptodev marked this conversation as resolved.
Show resolved Hide resolved
switch v := obj.(type) {
case int, string, float64, bool:
return fmt.Sprintf("%v", v)
default:
return computeHash(fmt.Sprintf("%#v", v))
}
}
290 changes: 290 additions & 0 deletions internal/runtime/internal/controller/node_config_foreach_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
package controller

import (
"context"
"os"
"sync/atomic"
"testing"
"time"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging"
"github.com/grafana/alloy/syntax/ast"
"github.com/grafana/alloy/syntax/parser"
"github.com/grafana/alloy/syntax/vm"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace/noop"
)

func TestCreateCustomComponents(t *testing.T) {
config := `foreach "default" {
mattdurham marked this conversation as resolved.
Show resolved Hide resolved
collection = [1, 2, 3]
var = "num"
template {
}
}`
foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t))
require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{}))))
customComponentIds := foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents
require.ElementsMatch(t, customComponentIds, []string{"foreach_1_1", "foreach_2_1", "foreach_3_1"})
keys := make([]string, 0, len(foreachConfigNode.customComponents))
for key := range foreachConfigNode.customComponents {
keys = append(keys, key)
}
require.ElementsMatch(t, keys, []string{"foreach_1_1", "foreach_2_1", "foreach_3_1"})
}

func TestCreateCustomComponentsDuplicatedIds(t *testing.T) {
config := `foreach "default" {
collection = [1, 2, 1]
var = "num"
template {
}
}`
foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t))
require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{}))))
customComponentIds := foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents
require.ElementsMatch(t, customComponentIds, []string{"foreach_1_1", "foreach_2_1", "foreach_1_2"})
keys := make([]string, 0, len(foreachConfigNode.customComponents))
for key := range foreachConfigNode.customComponents {
keys = append(keys, key)
}
require.ElementsMatch(t, keys, []string{"foreach_1_1", "foreach_2_1", "foreach_1_2"})
}

func TestCreateCustomComponentsWithUpdate(t *testing.T) {
config := `foreach "default" {
collection = [1, 2, 3]
var = "num"
template {
}
}`
foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t))
require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{}))))
customComponentIds := foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents
require.ElementsMatch(t, customComponentIds, []string{"foreach_1_1", "foreach_2_1", "foreach_3_1"})
keys := make([]string, 0, len(foreachConfigNode.customComponents))
for key := range foreachConfigNode.customComponents {
keys = append(keys, key)
}
require.ElementsMatch(t, keys, []string{"foreach_1_1", "foreach_2_1", "foreach_3_1"})

newConfig := `foreach "default" {
collection = [2, 1, 1]
var = "num"
template {
}
}`
foreachConfigNode.moduleController.(*ModuleControllerMock).Reset()
foreachConfigNode.UpdateBlock(getBlockFromConfig(t, newConfig))
require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{}))))
customComponentIds = foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents

// Only the 2nd "1" item in the collection is created because the two others were already created.
require.ElementsMatch(t, customComponentIds, []string{"foreach_1_2"})

// "foreach31" was removed, "foreach12" was added
keys = make([]string, 0, len(foreachConfigNode.customComponents))
for key := range foreachConfigNode.customComponents {
keys = append(keys, key)
}
require.ElementsMatch(t, keys, []string{"foreach_1_1", "foreach_2_1", "foreach_1_2"})
}

func TestRunCustomComponents(t *testing.T) {
config := `foreach "default" {
collection = [1, 2, 3]
var = "num"
template {
}
}`
foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t))
require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{}))))
ctx, cancel := context.WithCancel(context.Background())
go foreachConfigNode.Run(ctx)

// check that all custom components are running correctly
require.EventuallyWithT(t, func(c *assert.CollectT) {
for _, cc := range foreachConfigNode.customComponents {
assert.True(c, cc.(*CustomComponentMock).IsRunning.Load())
}
}, 1*time.Second, 5*time.Millisecond)

cancel()
// check that all custom components are stopped
require.EventuallyWithT(t, func(c *assert.CollectT) {
for _, cc := range foreachConfigNode.customComponents {
assert.False(c, cc.(*CustomComponentMock).IsRunning.Load())
}
}, 1*time.Second, 5*time.Millisecond)
}

func TestRunCustomComponentsAfterUpdate(t *testing.T) {
config := `foreach "default" {
collection = [1, 2, 3]
var = "num"
template {
}
}`
foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t))
require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{}))))
ctx, cancel := context.WithCancel(context.Background())
go foreachConfigNode.Run(ctx)

// check that all custom components are running correctly
require.EventuallyWithT(t, func(c *assert.CollectT) {
for _, cc := range foreachConfigNode.customComponents {
assert.True(c, cc.(*CustomComponentMock).IsRunning.Load())
}
}, 1*time.Second, 5*time.Millisecond)

newConfig := `foreach "default" {
collection = [2, 1, 1]
var = "num"
template {
}
}`
foreachConfigNode.moduleController.(*ModuleControllerMock).Reset()
foreachConfigNode.UpdateBlock(getBlockFromConfig(t, newConfig))
require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{}))))

newComponentIds := []string{"foreach_1_1", "foreach_2_1", "foreach_1_2"}
// check that all new custom components are running correctly
require.EventuallyWithT(t, func(c *assert.CollectT) {
for id, cc := range foreachConfigNode.customComponents {
assert.Contains(c, newComponentIds, id)
assert.True(c, cc.(*CustomComponentMock).IsRunning.Load())
}
}, 1*time.Second, 5*time.Millisecond)

cancel()
// check that all custom components are stopped
require.EventuallyWithT(t, func(c *assert.CollectT) {
for _, cc := range foreachConfigNode.customComponents {
assert.False(c, cc.(*CustomComponentMock).IsRunning.Load())
}
}, 1*time.Second, 5*time.Millisecond)
}

func TestCreateCustomComponentsCollectionObjectsWithUpdate(t *testing.T) {
config := `foreach "default" {
collection = [obj1, obj2]
var = "num"
template {
}
}`
foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t))
vars := map[string]interface{}{
"obj1": map[string]string{
"label1": "a",
"label2": "b",
},
"obj2": map[string]string{
"label3": "c",
},
}
require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(vars)))
customComponentIds := foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents
require.ElementsMatch(t, customComponentIds, []string{"foreach_be19d02a2ccb2cbc2c47e90d_1", "foreach_b335d50e2e8490eb8bf5f51b_1"})
keys := make([]string, 0, len(foreachConfigNode.customComponents))
for key := range foreachConfigNode.customComponents {
keys = append(keys, key)
}
require.ElementsMatch(t, keys, []string{"foreach_be19d02a2ccb2cbc2c47e90d_1", "foreach_b335d50e2e8490eb8bf5f51b_1"})

newConfig := `foreach "default" {
collection = [obj1, obj3]
var = "num"
template {
}
}`
vars2 := map[string]interface{}{
"obj1": map[string]string{
"label1": "a",
"label2": "b",
},
"obj3": map[string]string{
"label3": "d",
},
}
foreachConfigNode.moduleController.(*ModuleControllerMock).Reset()
foreachConfigNode.UpdateBlock(getBlockFromConfig(t, newConfig))
require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(vars2)))
customComponentIds = foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents

// Create only the custom component for the obj3 because the one for obj1 was already created
require.ElementsMatch(t, customComponentIds, []string{"foreach_1464766cf9c8fd1095d0f7a2_1"})

// "foreachb335d50e2e8490eb8bf5f51b1" was removed, "foreach1464766cf9c8fd1095d0f7a21" was added
keys = make([]string, 0, len(foreachConfigNode.customComponents))
for key := range foreachConfigNode.customComponents {
keys = append(keys, key)
}
require.ElementsMatch(t, keys, []string{"foreach_be19d02a2ccb2cbc2c47e90d_1", "foreach_1464766cf9c8fd1095d0f7a2_1"})
}

func getBlockFromConfig(t *testing.T, config string) *ast.BlockStmt {
file, err := parser.ParseFile("", []byte(config))
require.NoError(t, err)
return file.Body[0].(*ast.BlockStmt)
}

func getComponentGlobals(t *testing.T) ComponentGlobals {
l, _ := logging.New(os.Stderr, logging.DefaultOptions)
return ComponentGlobals{
Logger: l,
TraceProvider: noop.NewTracerProvider(),
DataPath: t.TempDir(),
MinStability: featuregate.StabilityGenerallyAvailable,
OnBlockNodeUpdate: func(cn BlockNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
NewModuleController: func(id string) ModuleController {
return NewModuleControllerMock()
},
}
}

type ModuleControllerMock struct {
CustomComponents []string
}

func NewModuleControllerMock() ModuleController {
return &ModuleControllerMock{
CustomComponents: make([]string, 0),
}
}

func (m *ModuleControllerMock) NewModule(id string, export component.ExportFunc) (component.Module, error) {
return nil, nil
}

func (m *ModuleControllerMock) ModuleIDs() []string {
return nil
}

func (m *ModuleControllerMock) NewCustomComponent(id string, export component.ExportFunc) (CustomComponent, error) {
m.CustomComponents = append(m.CustomComponents, id)
return &CustomComponentMock{}, nil
}

func (m *ModuleControllerMock) Reset() {
m.CustomComponents = make([]string, 0)
}

type CustomComponentMock struct {
IsRunning atomic.Bool
}

func (c *CustomComponentMock) LoadBody(body ast.Body, args map[string]any, customComponentRegistry *CustomComponentRegistry) error {
return nil
}

func (c *CustomComponentMock) Run(ctx context.Context) error {
c.IsRunning.Store(true)
<-ctx.Done()
c.IsRunning.Store(false)
return nil
}