diff --git a/integration-tests/tests/module-file/loki.river b/integration-tests/tests/module-file/loki.river index c5c57e4aa9b8..63aaa960976b 100644 --- a/integration-tests/tests/module-file/loki.river +++ b/integration-tests/tests/module-file/loki.river @@ -2,20 +2,16 @@ import.file "logTarget" { filename = "logfile.river" } +import.file "write" { + filename = "loki_write.river" +} + declare "loki" { argument "file" {} loki.source.file "test" { targets = argument.file.value - forward_to = [loki.write.test.receiver] - } - - loki.write "test" { - endpoint { - url = "http://localhost:3100/loki/api/v1/push" - } - external_labels = { - test_name = "module_file", - } + forward_to = [write.loki_write.default.receiver] } + write.loki_write "default" {} } diff --git a/integration-tests/tests/module-file/loki_write.river b/integration-tests/tests/module-file/loki_write.river new file mode 100644 index 000000000000..03c9ed44b66b --- /dev/null +++ b/integration-tests/tests/module-file/loki_write.river @@ -0,0 +1,13 @@ +declare "loki_write" { + loki.write "test" { + endpoint { + url = "http://localhost:3100/loki/api/v1/push" + } + external_labels = { + test_name = "module_file", + } + } + export "receiver" { + value = loki.write.test.receiver + } +} \ No newline at end of file diff --git a/pkg/flow/internal/controller/loader.go b/pkg/flow/internal/controller/loader.go index 64be9cce8dbb..5700268ea54d 100644 --- a/pkg/flow/internal/controller/loader.go +++ b/pkg/flow/internal/controller/loader.go @@ -556,7 +556,7 @@ func (l *Loader) wireModuleDependencies(g *dag.Graph, dc *DeclareComponentNode, references = deps } else { var err error - references, err = GetModuleReferences(declareNode, l.importNodes, l.declareNodes, l.parentModuleDependencies) + references, err = GetModuleReferences(declareNode.content, l.importNodes, l.declareNodes, l.parentModuleDependencies) if err != nil { return err } @@ -883,6 +883,26 @@ func (l *Loader) getModuleInfo(fullName string, namespace string, module string) if err != nil { return moduleInfo, err } + + // The import node might need its nested imported content. + // We need to pass the correct content according to the namespace to respect the scope. + lastIndex := strings.LastIndex(module, ".") + if lastIndex != -1 { + scope := module[:lastIndex] + moduleInfo.moduleDependencies = make(map[string]string) + for importedMod, importedModContent := range node.importedContent { + if strings.HasPrefix(importedMod, scope) { + moduleInfo.moduleDependencies[strings.TrimPrefix(importedMod, scope+".")] = importedModContent + } + } + } else { + // In this case the declare is only at depth 1 which corresponds to the importedContent, so we can just pass everything. + moduleInfo.moduleDependencies = node.importedContent + } + + if err != nil { + return moduleInfo, err + } } else if c, ok := l.parentModuleDependencies[fullName]; ok { content = c } else { diff --git a/pkg/flow/internal/controller/module_references.go b/pkg/flow/internal/controller/module_references.go index 338203b05a14..839ff8b0a6e8 100644 --- a/pkg/flow/internal/controller/module_references.go +++ b/pkg/flow/internal/controller/module_references.go @@ -15,13 +15,13 @@ type ModuleReference struct { } func GetModuleReferences( - declareNode *DeclareNode, + content string, importNodes map[string]*ImportConfigNode, declareNodes map[string]*DeclareNode, parentModuleDependencies map[string]string, ) ([]ModuleReference, error) { uniqueReferences := make(map[string]ModuleReference) - err := getModuleReferences(declareNode.content, importNodes, declareNodes, uniqueReferences, parentModuleDependencies) + err := getModuleReferences(content, importNodes, declareNodes, uniqueReferences, parentModuleDependencies) if err != nil { return nil, err } diff --git a/pkg/flow/internal/controller/node_config_import.go b/pkg/flow/internal/controller/node_config_import.go index 101f469b3b26..ff63b6c0fc0a 100644 --- a/pkg/flow/internal/controller/node_config_import.go +++ b/pkg/flow/internal/controller/node_config_import.go @@ -31,7 +31,7 @@ type ImportConfigNode struct { source importsource.ImportSource registry *prometheus.Registry importedContent map[string]string - importConfigNodesChildren map[importsource.SourceType]map[string]*ImportConfigNode + importConfigNodesChildren map[string]*ImportConfigNode OnComponentUpdate func(cn NodeWithDependants) // Informs controller that we need to reevaluate logger log.Logger inContentUpdate bool @@ -173,16 +173,13 @@ func (cn *ImportConfigNode) processDeclareBlock(stmt *ast.BlockStmt, content str // processDeclareBlock processes an import block. func (cn *ImportConfigNode) processImportBlock(stmt *ast.BlockStmt, fullName string) { sourceType := importsource.GetSourceType(fullName) - if _, ok := cn.importConfigNodesChildren[sourceType][stmt.Label]; ok { + if _, ok := cn.importConfigNodesChildren[stmt.Label]; ok { level.Error(cn.logger).Log("msg", "import block redefined", "name", stmt.Label) return } childGlobals := cn.globals childGlobals.OnComponentUpdate = cn.OnChildrenContentUpdate - if cn.importConfigNodesChildren[sourceType] == nil { - cn.importConfigNodesChildren[sourceType] = make(map[string]*ImportConfigNode) - } - cn.importConfigNodesChildren[sourceType][stmt.Label] = NewImportConfigNode(stmt, childGlobals, sourceType) + cn.importConfigNodesChildren[stmt.Label] = NewImportConfigNode(stmt, childGlobals, sourceType) } // onContentUpdate is triggered every time the managed import component has new content. @@ -192,7 +189,7 @@ func (cn *ImportConfigNode) onContentUpdate(content string) { cn.inContentUpdate = true cn.importedContent = make(map[string]string) // TODO: We recreate the nodes when the content changes. Can we copy instead for optimization? - cn.importConfigNodesChildren = make(map[importsource.SourceType]map[string]*ImportConfigNode) + cn.importConfigNodesChildren = make(map[string]*ImportConfigNode) node, err := parser.ParseFile(cn.label, []byte(content)) if err != nil { level.Error(cn.logger).Log("msg", "failed to parse file on update", "err", err) @@ -207,13 +204,11 @@ func (cn *ImportConfigNode) onContentUpdate(content string) { // evaluateChildren evaluates the import nodes managed by this import node. func (cn *ImportConfigNode) evaluateChildren() { - for _, sourceTypeChildren := range cn.importConfigNodesChildren { - for _, child := range sourceTypeChildren { - child.Evaluate(&vm.Scope{ - Parent: nil, - Variables: make(map[string]interface{}), - }) - } + for _, child := range cn.importConfigNodesChildren { + child.Evaluate(&vm.Scope{ + Parent: nil, + Variables: make(map[string]interface{}), + }) } } diff --git a/pkg/flow/module_import_test.go b/pkg/flow/module_import_test.go index 0637059ce610..09078dc560d4 100644 --- a/pkg/flow/module_import_test.go +++ b/pkg/flow/module_import_test.go @@ -15,12 +15,13 @@ import ( func TestImportModule(t *testing.T) { testCases := []struct { - name string - module string - otherModule string - config string - updateModule func(filename string) string - updateFile string + name string + module string + otherModule string + yetAnotherModule string + config string + updateModule func(filename string) string + updateFile string }{ { name: "TestImportModule", @@ -212,22 +213,22 @@ func TestImportModule(t *testing.T) { filename = "my_module" } - declare "anotherModule" { + declare "anotherModule" { testcomponents.count "inc" { frequency = "10ms" max = 10 } - testImport.otherModule.test "myModule" { - input = testcomponents.count.inc.count - } + testImport.otherModule.test "myModule" { + input = testcomponents.count.inc.count + } export "output" { value = testImport.otherModule.test.myModule.output } } - anotherModule "myOtherModule" {} + anotherModule "myOtherModule" {} testcomponents.summation "sum" { input = anotherModule.myOtherModule.output @@ -308,7 +309,7 @@ func TestImportModule(t *testing.T) { } } - yetAgainAnotherModule "default" {} + yetAgainAnotherModule "default" {} testcomponents.summation "sum" { input = yetAgainAnotherModule.default.output @@ -334,6 +335,213 @@ func TestImportModule(t *testing.T) { }, updateFile: "other_module", }, + { + name: "TestImportedModuleUsedInImportedFileWithDepth1", + module: ` + import.file "otherModule" { + filename = "other_module" + } + declare "anotherModule" { + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + otherModule.test "default" { + input = testcomponents.count.inc.count + } + + export "output" { + value = otherModule.test.default.output + } + } + `, + otherModule: ` + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } + } + `, + config: ` + import.file "testImport" { + filename = "my_module" + } + + testImport.anotherModule "myOtherModule" {} + + testcomponents.summation "sum" { + input = testImport.anotherModule.myOtherModule.output + } + `, + updateModule: func(filename string) string { + return ` + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = -10 + } + } + ` + }, + updateFile: "other_module", + }, + { + name: "TestDeclaredModuleUsedInImportedFileWithDepth2", + module: ` + import.file "otherModule" { + filename = "other_module" + } + `, + otherModule: ` + declare "anotherModule" { + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + test "default" { + input = testcomponents.count.inc.count + } + + export "output" { + value = test.default.output + } + } + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } + } + `, + config: ` + import.file "testImport" { + filename = "my_module" + } + + testImport.otherModule.anotherModule "myOtherModule" {} + + testcomponents.summation "sum" { + input = testImport.otherModule.anotherModule.myOtherModule.output + } + `, + updateModule: func(filename string) string { + return ` + declare "anotherModule" { + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + test "default" { + input = testcomponents.count.inc.count + } + + export "output" { + value = test.default.output + } + } + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = -10 + } + } + ` + }, + updateFile: "other_module", + }, + { + name: "TestDeclaredModuleUsedInImportedFileWithDepth3", + module: ` + import.file "otherModule" { + filename = "other_module" + } + import.file "uselessImportToSeeIfItBreaks" { + filename = "yet_another_module" + } + `, + otherModule: ` + import.file "default" { + filename = "yet_another_module" + } + declare "anotherModule" { + testcomponents.count "inc" { + frequency = "10ms" + max = 10 + } + + default.test "default" { + input = testcomponents.count.inc.count + } + + export "output" { + value = default.test.default.output + } + } + `, + yetAnotherModule: ` + declare "test" { + argument "input" { + optional = false + } + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } + } + `, + config: ` + import.file "testImport" { + filename = "my_module" + } + + testImport.otherModule.anotherModule "myOtherModule" {} + + testcomponents.summation "sum" { + input = testImport.otherModule.anotherModule.myOtherModule.output + } + `, + }, } for _, tc := range testCases { @@ -348,6 +556,12 @@ func TestImportModule(t *testing.T) { defer os.Remove(otherFilename) } + yetAnotherModule := "yet_another_module" + if tc.yetAnotherModule != "" { + require.NoError(t, os.WriteFile(yetAnotherModule, []byte(tc.yetAnotherModule), 0664)) + defer os.Remove(yetAnotherModule) + } + ctrl := flow.New(testOptions(t)) f, err := flow.ParseSource(t.Name(), []byte(tc.config)) require.NoError(t, err)