diff --git a/internal/runtime/import_test.go b/internal/runtime/import_test.go index dab9400d31..f0e3f4364c 100644 --- a/internal/runtime/import_test.go +++ b/internal/runtime/import_test.go @@ -75,6 +75,35 @@ func buildTestImportFile(t *testing.T, filename string) testImportFile { return tc } +// This is a copy of TestImportFile. +// It may need to be modified further to make it work with a foreach. +func TestForeach(t *testing.T) { + directory := "./testdata/foreach" + for _, file := range getTestFiles(directory, t) { + tc := buildTestImportFile(t, filepath.Join(directory, file.Name())) + t.Run(tc.description, func(t *testing.T) { + defer os.Remove("module.alloy") + require.NoError(t, os.WriteFile("module.alloy", []byte(tc.module), 0664)) + if tc.nestedModule != "" { + defer os.Remove("nested_module.alloy") + require.NoError(t, os.WriteFile("nested_module.alloy", []byte(tc.nestedModule), 0664)) + } + if tc.otherNestedModule != "" { + defer os.Remove("other_nested_module.alloy") + require.NoError(t, os.WriteFile("other_nested_module.alloy", []byte(tc.otherNestedModule), 0664)) + } + + if tc.update != nil { + testConfig2(t, tc.main, tc.reloadConfig, func() { + require.NoError(t, os.WriteFile(tc.update.name, []byte(tc.update.updateConfig), 0664)) + }) + } else { + testConfig2(t, tc.main, tc.reloadConfig, nil) + } + }) + } +} + func TestImportFile(t *testing.T) { directory := "./testdata/import_file" for _, file := range getTestFiles(directory, t) { @@ -314,6 +343,63 @@ func testConfig(t *testing.T, config string, reloadConfig string, update func()) } } +// This function is a copy of testConfig above. +func testConfig2(t *testing.T, config string, reloadConfig string, update func()) { + defer verifyNoGoroutineLeaks(t) + ctrl, f := setup(t, config) + + err := ctrl.LoadSource(f, nil) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + defer func() { + cancel() + wg.Wait() + }() + + wg.Add(1) + go func() { + defer wg.Done() + ctrl.Run(ctx) + }() + + // Check for initial condition + require.Eventually(t, func() bool { + export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation2.final") + // If each iteration of the for loop adds a 1, + // and there are 3 iterations, we expect 3 to be the end result. + //TODO: Make this configurable? + return export.Sum == 3 + }, 3*time.Second, 10*time.Millisecond) + + // if update != nil { + // update() + + // // Export should be -10 after update + // require.Eventually(t, func() bool { + // export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum") + // return export.LastAdded <= -10 + // }, 3*time.Second, 10*time.Millisecond) + // } + + // if reloadConfig != "" { + // f, err = alloy_runtime.ParseSource(t.Name(), []byte(reloadConfig)) + // require.NoError(t, err) + // require.NotNil(t, f) + + // // Reload the controller with the new config. + // err = ctrl.LoadSource(f, nil) + // require.NoError(t, err) + + // // Export should be -10 after update + // require.Eventually(t, func() bool { + // export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum") + // return export.LastAdded <= -10 + // }, 3*time.Second, 10*time.Millisecond) + // } +} + func testConfigError(t *testing.T, config string, expectedError string) { defer verifyNoGoroutineLeaks(t) ctrl, f := setup(t, config) diff --git a/internal/runtime/internal/controller/node_config.go b/internal/runtime/internal/controller/node_config.go index 1293eac305..5338237caf 100644 --- a/internal/runtime/internal/controller/node_config.go +++ b/internal/runtime/internal/controller/node_config.go @@ -29,6 +29,8 @@ func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals) (BlockNode, d return NewTracingConfigNode(block, globals), nil case importsource.BlockImportFile, importsource.BlockImportString, importsource.BlockImportHTTP, importsource.BlockImportGit: return NewImportConfigNode(block, globals, importsource.GetSourceType(block.GetBlockName())), nil + case importsource.BlockForeach: + return NewForeachConfigNode(block, globals), nil default: var diags diag.Diagnostics diags.Add(diag.Diagnostic{ @@ -50,6 +52,7 @@ type ConfigNodeMap struct { argumentMap map[string]*ArgumentConfigNode exportMap map[string]*ExportConfigNode importMap map[string]*ImportConfigNode + foreachMap map[string]*ForeachConfigNode } // NewConfigNodeMap will create an initial ConfigNodeMap. Append must be called @@ -61,6 +64,7 @@ func NewConfigNodeMap() *ConfigNodeMap { argumentMap: map[string]*ArgumentConfigNode{}, exportMap: map[string]*ExportConfigNode{}, importMap: map[string]*ImportConfigNode{}, + foreachMap: map[string]*ForeachConfigNode{}, } } @@ -80,6 +84,8 @@ func (nodeMap *ConfigNodeMap) Append(configNode BlockNode) diag.Diagnostics { nodeMap.tracing = n case *ImportConfigNode: nodeMap.importMap[n.Label()] = n + case *ForeachConfigNode: + nodeMap.foreachMap[n.Label()] = n default: diags.Add(diag.Diagnostic{ Severity: diag.SeverityLevelError, diff --git a/internal/runtime/internal/controller/node_config_foreach.go b/internal/runtime/internal/controller/node_config_foreach.go new file mode 100644 index 0000000000..f27553832d --- /dev/null +++ b/internal/runtime/internal/controller/node_config_foreach.go @@ -0,0 +1,47 @@ +package controller + +import ( + "github.com/grafana/alloy/syntax/ast" + "github.com/grafana/alloy/syntax/vm" +) + +type ForeachConfigNode struct { + nodeID string + label string + block *ast.BlockStmt // Current Alloy blocks to derive config from +} + +var _ BlockNode = (*ForeachConfigNode)(nil) + +// For now the Foreach doesn't have the ability to export arguments. +//TODO: We could implement this in the future? + +type ForeachArguments struct { + Collection string `alloy:"collection,attr` + //TODO: Is the "var" argument really needed? + // We could just have a variable with a fixed name referencing the current thing we are iterating over. + Var string `alloy:"var,attr,optional` +} + +func NewForeachConfigNode(block *ast.BlockStmt, globals ComponentGlobals) *ForeachConfigNode { + nodeID := BlockComponentID(block).String() + + return &ForeachConfigNode{ + nodeID: nodeID, + label: block.Label, + block: block, + } +} + +func (fn *ForeachConfigNode) Label() string { return fn.label } + +func (fn *ForeachConfigNode) NodeID() string { return fn.nodeID } + +func (fn *ForeachConfigNode) Block() *ast.BlockStmt { return fn.block } + +func (fn *ForeachConfigNode) Evaluate(scope *vm.Scope) error { + return nil +} + +func (fn *ForeachConfigNode) UpdateBlock(b *ast.BlockStmt) { +} diff --git a/internal/runtime/internal/importsource/import_source.go b/internal/runtime/internal/importsource/import_source.go index 79686d6735..cf2833d1b4 100644 --- a/internal/runtime/internal/importsource/import_source.go +++ b/internal/runtime/internal/importsource/import_source.go @@ -15,6 +15,7 @@ const ( String Git HTTP + Foreach ) const ( @@ -22,6 +23,7 @@ const ( BlockImportString = "import.string" BlockImportHTTP = "import.http" BlockImportGit = "import.git" + BlockForeach = "foreach" ) // ImportSource retrieves a module from a source. @@ -63,6 +65,8 @@ func GetSourceType(fullName string) SourceType { return HTTP case BlockImportGit: return Git + case BlockForeach: + return Foreach } panic(fmt.Errorf("name does not map to a known source type: %v", fullName)) } diff --git a/internal/runtime/internal/testcomponents/sumation1.go b/internal/runtime/internal/testcomponents/sumation1.go new file mode 100644 index 0000000000..8a02ac4697 --- /dev/null +++ b/internal/runtime/internal/testcomponents/sumation1.go @@ -0,0 +1,73 @@ +package testcomponents + +import ( + "context" + + "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/logging/level" + "go.uber.org/atomic" +) + +func init() { + component.Register(component.Registration{ + Name: "testcomponents.summation1", + Stability: featuregate.StabilityPublicPreview, + Args: SummationConfig_1{}, + Exports: SummationExports_1{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return NewSummation(opts, args.(SummationConfig)) + }, + }) +} + +type SummationConfig_1 struct { + Input int `alloy:"input,attr"` + //TODO: What should the type be? + ForwardTo []string `alloy:"forward_to,attr"` +} + +type SummationExports_1 struct { + Sum int `alloy:"sum,attr"` + LastAdded int `alloy:"last_added,attr"` +} + +type Summation_1 struct { + opts component.Options + log log.Logger + sum atomic.Int32 +} + +// NewSummation creates a new summation component. +func NewSummation_1(o component.Options, cfg SummationConfig) (*Summation, error) { + t := &Summation{opts: o, log: o.Logger} + if err := t.Update(cfg); err != nil { + return nil, err + } + return t, nil +} + +var ( + _ component.Component = (*Summation)(nil) +) + +// Run implements Component. +func (t *Summation_1) Run(ctx context.Context) error { + <-ctx.Done() + return nil +} + +// Update implements Component. +func (t *Summation_1) Update(args component.Arguments) error { + c := args.(SummationConfig) + newSum := int(t.sum.Add(int32(c.Input))) + + level.Info(t.log).Log("msg", "updated sum", "value", newSum, "input", c.Input) + t.opts.OnStateChange(SummationExports{Sum: newSum, LastAdded: c.Input}) + + //TODO: Send the data over to all the receivers listed in forward_to + + return nil +} diff --git a/internal/runtime/internal/testcomponents/sumation2.go b/internal/runtime/internal/testcomponents/sumation2.go new file mode 100644 index 0000000000..44ed7e876f --- /dev/null +++ b/internal/runtime/internal/testcomponents/sumation2.go @@ -0,0 +1,4 @@ +package testcomponents + +//TODO: Implement the component proposed in: +// alloy/internal/runtime/testdata/foreach/foreach_1.txtar diff --git a/internal/runtime/source.go b/internal/runtime/source.go index f02e71c6e4..88d779400f 100644 --- a/internal/runtime/source.go +++ b/internal/runtime/source.go @@ -75,7 +75,7 @@ func sourceFromBody(body ast.Body) (*Source, error) { switch fullName { case "declare": declares = append(declares, stmt) - case "logging", "tracing", "argument", "export", "import.file", "import.string", "import.http", "import.git": + case "logging", "tracing", "argument", "export", "import.file", "import.string", "import.http", "import.git", "foreach": configs = append(configs, stmt) default: components = append(components, stmt) diff --git a/internal/runtime/testdata/foreach/foreach_1.txtar b/internal/runtime/testdata/foreach/foreach_1.txtar new file mode 100644 index 0000000000..9c372942a6 --- /dev/null +++ b/internal/runtime/testdata/foreach/foreach_1.txtar @@ -0,0 +1,17 @@ +-- main.alloy -- +foreach "testForeach" { + collection = [1, 2, 3, 4] + //var = "num" + + // Similar to testcomponents.summation, but with a "forward_to" + testcomponents.summation1 "sum" { + //TODO: Use the num variable here + // input = num + input = 1 + forward_to = testcomponents.summation2.final.receiver + } +} + +// Similar to testcomponents.summation, but with a "receiver" export +testcomponents.summation2 "final" { +} \ No newline at end of file