Skip to content

Commit

Permalink
Foreach prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
ptodev committed Oct 29, 2024
1 parent 702491d commit dd602e6
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 1 deletion.
86 changes: 86 additions & 0 deletions internal/runtime/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,35 @@ func buildTestImportFile(t *testing.T, filename string) testImportFile {
return tc
}

// This is a copy of TestImportFile.
// It may need to be modified further to make it work with a foreach.
func TestForeach(t *testing.T) {
directory := "./testdata/foreach"
for _, file := range getTestFiles(directory, t) {
tc := buildTestImportFile(t, filepath.Join(directory, file.Name()))
t.Run(tc.description, func(t *testing.T) {
defer os.Remove("module.alloy")
require.NoError(t, os.WriteFile("module.alloy", []byte(tc.module), 0664))
if tc.nestedModule != "" {
defer os.Remove("nested_module.alloy")
require.NoError(t, os.WriteFile("nested_module.alloy", []byte(tc.nestedModule), 0664))
}
if tc.otherNestedModule != "" {
defer os.Remove("other_nested_module.alloy")
require.NoError(t, os.WriteFile("other_nested_module.alloy", []byte(tc.otherNestedModule), 0664))
}

if tc.update != nil {
testConfig2(t, tc.main, tc.reloadConfig, func() {
require.NoError(t, os.WriteFile(tc.update.name, []byte(tc.update.updateConfig), 0664))
})
} else {
testConfig2(t, tc.main, tc.reloadConfig, nil)
}
})
}
}

func TestImportFile(t *testing.T) {
directory := "./testdata/import_file"
for _, file := range getTestFiles(directory, t) {
Expand Down Expand Up @@ -314,6 +343,63 @@ func testConfig(t *testing.T, config string, reloadConfig string, update func())
}
}

// This function is a copy of testConfig above.
func testConfig2(t *testing.T, config string, reloadConfig string, update func()) {
defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, config)

err := ctrl.LoadSource(f, nil)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
defer func() {
cancel()
wg.Wait()
}()

wg.Add(1)
go func() {
defer wg.Done()
ctrl.Run(ctx)
}()

// Check for initial condition
require.Eventually(t, func() bool {
export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation2.final")
// If each iteration of the for loop adds a 1,
// and there are 3 iterations, we expect 3 to be the end result.
//TODO: Make this configurable?
return export.Sum == 3
}, 3*time.Second, 10*time.Millisecond)

// if update != nil {
// update()

// // Export should be -10 after update
// require.Eventually(t, func() bool {
// export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum")
// return export.LastAdded <= -10
// }, 3*time.Second, 10*time.Millisecond)
// }

// if reloadConfig != "" {

Check failure on line 386 in internal/runtime/import_test.go

View workflow job for this annotation

GitHub Actions / Test (macos-latest-xlarge)

not enough arguments in call to ctrl.LoadSource
// f, err = alloy_runtime.ParseSource(t.Name(), []byte(reloadConfig))
// require.NoError(t, err)
// require.NotNil(t, f)

// // Reload the controller with the new config.
// err = ctrl.LoadSource(f, nil)
// require.NoError(t, err)

// // Export should be -10 after update
// require.Eventually(t, func() bool {
// export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum")
// return export.LastAdded <= -10
// }, 3*time.Second, 10*time.Millisecond)
// }
}

func testConfigError(t *testing.T, config string, expectedError string) {
defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, config)
Expand Down
6 changes: 6 additions & 0 deletions internal/runtime/internal/controller/node_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals) (BlockNode, d
return NewTracingConfigNode(block, globals), nil
case importsource.BlockImportFile, importsource.BlockImportString, importsource.BlockImportHTTP, importsource.BlockImportGit:
return NewImportConfigNode(block, globals, importsource.GetSourceType(block.GetBlockName())), nil
case importsource.BlockForeach:
return NewForeachConfigNode(block, globals), nil
default:
var diags diag.Diagnostics
diags.Add(diag.Diagnostic{
Expand All @@ -50,6 +52,7 @@ type ConfigNodeMap struct {
argumentMap map[string]*ArgumentConfigNode
exportMap map[string]*ExportConfigNode
importMap map[string]*ImportConfigNode
foreachMap map[string]*ForeachConfigNode
}

// NewConfigNodeMap will create an initial ConfigNodeMap. Append must be called
Expand All @@ -61,6 +64,7 @@ func NewConfigNodeMap() *ConfigNodeMap {
argumentMap: map[string]*ArgumentConfigNode{},
exportMap: map[string]*ExportConfigNode{},
importMap: map[string]*ImportConfigNode{},
foreachMap: map[string]*ForeachConfigNode{},
}
}

Expand All @@ -80,6 +84,8 @@ func (nodeMap *ConfigNodeMap) Append(configNode BlockNode) diag.Diagnostics {
nodeMap.tracing = n
case *ImportConfigNode:
nodeMap.importMap[n.Label()] = n
case *ForeachConfigNode:
nodeMap.foreachMap[n.Label()] = n
default:
diags.Add(diag.Diagnostic{
Severity: diag.SeverityLevelError,
Expand Down
47 changes: 47 additions & 0 deletions internal/runtime/internal/controller/node_config_foreach.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package controller

import (
"github.com/grafana/alloy/syntax/ast"
"github.com/grafana/alloy/syntax/vm"
)

type ForeachConfigNode struct {
nodeID string
label string
block *ast.BlockStmt // Current Alloy blocks to derive config from
}

var _ BlockNode = (*ForeachConfigNode)(nil)

// For now the Foreach doesn't have the ability to export arguments.
//TODO: We could implement this in the future?

type ForeachArguments struct {
Collection string `alloy:"collection,attr`
//TODO: Is the "var" argument really needed?
// We could just have a variable with a fixed name referencing the current thing we are iterating over.
Var string `alloy:"var,attr,optional`
}

func NewForeachConfigNode(block *ast.BlockStmt, globals ComponentGlobals) *ForeachConfigNode {
nodeID := BlockComponentID(block).String()

return &ForeachConfigNode{
nodeID: nodeID,
label: block.Label,
block: block,
}
}

func (fn *ForeachConfigNode) Label() string { return fn.label }

func (fn *ForeachConfigNode) NodeID() string { return fn.nodeID }

func (fn *ForeachConfigNode) Block() *ast.BlockStmt { return fn.block }

func (fn *ForeachConfigNode) Evaluate(scope *vm.Scope) error {
return nil
}

func (fn *ForeachConfigNode) UpdateBlock(b *ast.BlockStmt) {
}
4 changes: 4 additions & 0 deletions internal/runtime/internal/importsource/import_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ const (
String
Git
HTTP
Foreach
)

const (
BlockImportFile = "import.file"
BlockImportString = "import.string"
BlockImportHTTP = "import.http"
BlockImportGit = "import.git"
BlockForeach = "foreach"
)

// ImportSource retrieves a module from a source.
Expand Down Expand Up @@ -63,6 +65,8 @@ func GetSourceType(fullName string) SourceType {
return HTTP
case BlockImportGit:
return Git
case BlockForeach:
return Foreach
}
panic(fmt.Errorf("name does not map to a known source type: %v", fullName))
}
73 changes: 73 additions & 0 deletions internal/runtime/internal/testcomponents/sumation1.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package testcomponents

import (
"context"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging/level"
"go.uber.org/atomic"
)

func init() {
component.Register(component.Registration{
Name: "testcomponents.summation1",
Stability: featuregate.StabilityPublicPreview,
Args: SummationConfig_1{},
Exports: SummationExports_1{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return NewSummation(opts, args.(SummationConfig))
},
})
}

type SummationConfig_1 struct {
Input int `alloy:"input,attr"`
//TODO: What should the type be?
ForwardTo []string `alloy:"forward_to,attr"`
}

type SummationExports_1 struct {
Sum int `alloy:"sum,attr"`
LastAdded int `alloy:"last_added,attr"`
}

type Summation_1 struct {
opts component.Options
log log.Logger
sum atomic.Int32
}

// NewSummation creates a new summation component.
func NewSummation_1(o component.Options, cfg SummationConfig) (*Summation, error) {
t := &Summation{opts: o, log: o.Logger}
if err := t.Update(cfg); err != nil {
return nil, err
}
return t, nil
}

var (
_ component.Component = (*Summation)(nil)
)

// Run implements Component.
func (t *Summation_1) Run(ctx context.Context) error {
<-ctx.Done()
return nil
}

// Update implements Component.
func (t *Summation_1) Update(args component.Arguments) error {
c := args.(SummationConfig)
newSum := int(t.sum.Add(int32(c.Input)))

level.Info(t.log).Log("msg", "updated sum", "value", newSum, "input", c.Input)
t.opts.OnStateChange(SummationExports{Sum: newSum, LastAdded: c.Input})

//TODO: Send the data over to all the receivers listed in forward_to

return nil
}
4 changes: 4 additions & 0 deletions internal/runtime/internal/testcomponents/sumation2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package testcomponents

//TODO: Implement the component proposed in:
// alloy/internal/runtime/testdata/foreach/foreach_1.txtar
2 changes: 1 addition & 1 deletion internal/runtime/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func sourceFromBody(body ast.Body) (*Source, error) {
switch fullName {
case "declare":
declares = append(declares, stmt)
case "logging", "tracing", "argument", "export", "import.file", "import.string", "import.http", "import.git":
case "logging", "tracing", "argument", "export", "import.file", "import.string", "import.http", "import.git", "foreach":
configs = append(configs, stmt)
default:
components = append(components, stmt)
Expand Down
17 changes: 17 additions & 0 deletions internal/runtime/testdata/foreach/foreach_1.txtar
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- main.alloy --
foreach "testForeach" {
collection = [1, 2, 3, 4]
//var = "num"

// Similar to testcomponents.summation, but with a "forward_to"
testcomponents.summation1 "sum" {
//TODO: Use the num variable here
// input = num
input = 1
forward_to = testcomponents.summation2.final.receiver
}
}

// Similar to testcomponents.summation, but with a "receiver" export
testcomponents.summation2 "final" {
}

0 comments on commit dd602e6

Please sign in to comment.