Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Module with declare and import #5968

Closed
wants to merge 41 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
a73e2e3
Implement new modules with import and declare keywords.
wildum Nov 30, 2023
ebc9028
Merge branch 'main' into modules-declare-import
wildum Jan 12, 2024
aca2289
minor changes following review
wildum Jan 15, 2024
8f50ca1
use Declare type instead of plain string to use the AST instead of pa…
wildum Jan 15, 2024
69a1d69
introduces a new componentNode interface
wildum Jan 16, 2024
f020082
rename according to new terminology
wildum Jan 16, 2024
130a98e
some additional renaming
wildum Jan 16, 2024
c883d1a
introduce the component node manager
wildum Jan 16, 2024
cb983c5
use deprecation notice on module component constructor
wildum Jan 16, 2024
e7fb05e
fix test after changing error message
wildum Jan 16, 2024
e2f9c99
custom component should return the managed component via Component()
wildum Jan 16, 2024
3c24a3c
introduces LoaderConfigOptions used to pass options like additional d…
wildum Jan 17, 2024
b7aa47e
ignore linter warning on deprecated func
wildum Jan 17, 2024
189a2b2
add deprecated doc on module Exports type
wildum Jan 17, 2024
7bfc196
rename function that decides if a component is a custom one
wildum Jan 17, 2024
1cb8a31
rename firstPart to namespace
wildum Jan 17, 2024
b93184e
make declare fields private
wildum Jan 17, 2024
d40e409
add comment regarding the redundancy in the Declare struct
wildum Jan 17, 2024
ce8ec22
rename blocks in loader to prevent confusion between componentBlocks …
wildum Jan 17, 2024
28d8f5f
remove unused field in loader
wildum Jan 17, 2024
3a9d20d
fix mutex and flaky test
wildum Jan 17, 2024
32e7811
remove unecessary mutex in declare
wildum Jan 17, 2024
fc0770e
add more doc for the Apply function
wildum Jan 18, 2024
e0e018b
rename and document interfaces
wildum Jan 18, 2024
3ca3692
cleanup on comments, namings and mutex
wildum Jan 18, 2024
40f5fd9
update running children in case of an update of the content in import…
wildum Jan 18, 2024
79b98ea
check for go routines leak
wildum Jan 18, 2024
65de374
comment cleanup
wildum Jan 18, 2024
219474a
can cancel on all paths for linter
wildum Jan 18, 2024
4f2d64d
improve import children handling
wildum Jan 18, 2024
3fdc372
renaming
wildum Jan 19, 2024
d53e29e
optimization to avoid reloading a module if the imported content did …
wildum Jan 19, 2024
f384f0b
improve tests
wildum Jan 19, 2024
fdcf18a
remove forgotten print
wildum Jan 19, 2024
d8e37c0
merge main
wildum Jan 22, 2024
c0bfd61
Merge branch 'main' into modules-declare-import
wildum Jan 23, 2024
fea7a8a
store args in custom component
wildum Jan 23, 2024
dbc4be7
merge main
wildum Jan 23, 2024
5ef4670
refactor integration tests to remove redundancy
wildum Jan 23, 2024
dcbc6d6
merge main
wildum Jan 25, 2024
60d6eca
cleanup
wildum Jan 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,7 @@ jobs:
go-version: "1.21"
- name: Set OTEL Exporter Endpoint
run: echo "OTEL_EXPORTER_ENDPOINT=172.17.0.1:4318" >> $GITHUB_ENV
- name: Set Module HTTP Endpoint
run: echo "MODULE_HTTP_ENDPOINT=http://172.17.0.1:8090/module.river" >> $GITHUB_ENV
- name: Run tests
run: make integration-test
11 changes: 7 additions & 4 deletions component/module/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/local/file"
"github.com/grafana/agent/component/module"
"github.com/grafana/agent/pkg/flow/config"
"github.com/grafana/river/rivertypes"
)

func init() {
component.Register(component.Registration{
Name: "module.file",
Args: Arguments{},
Name: "module.file",
Args: Arguments{},
//nolint:staticcheck
Exports: module.Exports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
Expand Down Expand Up @@ -58,6 +60,7 @@ var (

// New creates a new module.file component.
func New(o component.Options, args Arguments) (*Component, error) {
//nolint:staticcheck
m, err := module.NewModuleComponent(o)
if err != nil {
return nil, err
Expand Down Expand Up @@ -88,7 +91,7 @@ func (c *Component) newManagedLocalComponent(o component.Options) (*file.Compone

if !c.inUpdate.Load() && c.isCreated.Load() {
// Any errors found here are reported via component health
_ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value)
_ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value, config.DefaultLoaderConfigOptions())
}
}

Expand Down Expand Up @@ -135,7 +138,7 @@ func (c *Component) Update(args component.Arguments) error {

// Force a content load here and bubble up any error. This will catch problems
// on initial load.
return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value)
return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value, config.DefaultLoaderConfigOptions())
}

// CurrentHealth implements component.HealthComponent.
Expand Down
9 changes: 6 additions & 3 deletions component/module/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ import (
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/module"
"github.com/grafana/agent/internal/vcs"
"github.com/grafana/agent/pkg/flow/config"
"github.com/grafana/agent/pkg/flow/logging/level"
)

func init() {
component.Register(component.Registration{
Name: "module.git",
Args: Arguments{},
Name: "module.git",
Args: Arguments{},
//nolint:staticcheck
Exports: module.Exports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
Expand Down Expand Up @@ -74,6 +76,7 @@ var (

// New creates a new module.git component.
func New(o component.Options, args Arguments) (*Component, error) {
//nolint:staticcheck
m, err := module.NewModuleComponent(o)
if err != nil {
return nil, err
Expand Down Expand Up @@ -239,7 +242,7 @@ func (c *Component) pollFile(ctx context.Context, args Arguments) error {
return err
}

return c.mod.LoadFlowSource(args.Arguments, string(bb))
return c.mod.LoadFlowSource(args.Arguments, string(bb), config.DefaultLoaderConfigOptions())
}

// CurrentHealth implements component.HealthComponent.
Expand Down
11 changes: 7 additions & 4 deletions component/module/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/module"
remote_http "github.com/grafana/agent/component/remote/http"
"github.com/grafana/agent/pkg/flow/config"
"github.com/grafana/river/rivertypes"
)

func init() {
component.Register(component.Registration{
Name: "module.http",
Args: Arguments{},
Name: "module.http",
Args: Arguments{},
//nolint:staticcheck
Exports: module.Exports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
Expand Down Expand Up @@ -57,6 +59,7 @@ var (

// New creates a new module.http component.
func New(o component.Options, args Arguments) (*Component, error) {
//nolint:staticcheck
m, err := module.NewModuleComponent(o)
if err != nil {
return nil, err
Expand Down Expand Up @@ -87,7 +90,7 @@ func (c *Component) newManagedLocalComponent(o component.Options) (*remote_http.

if !c.inUpdate.Load() && c.isCreated.Load() {
// Any errors found here are reported via component health
_ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value)
_ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value, config.DefaultLoaderConfigOptions())
}
}

Expand Down Expand Up @@ -134,7 +137,7 @@ func (c *Component) Update(args component.Arguments) error {

// Force a content load here and bubble up any error. This will catch problems
// on initial load.
return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value)
return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value, config.DefaultLoaderConfigOptions())
}

// CurrentHealth implements component.HealthComponent.
Expand Down
63 changes: 53 additions & 10 deletions component/module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/grafana/agent/component"
"github.com/grafana/agent/pkg/flow/config"
"github.com/grafana/agent/pkg/flow/logging/level"
)

Expand All @@ -16,22 +17,40 @@ type ModuleComponent struct {
opts component.Options
mod component.Module

mut sync.RWMutex
health component.Health
latestContent string
latestArgs map[string]any
mut sync.RWMutex
health component.Health
latestContent string
latestArgs map[string]any
latestLoaderConfigOptions config.LoaderConfigOptions
}

// Exports holds values which are exported from the run module.
// Deprecated: Exports holds values which are exported from the run module. New modules use map[string]any directly.
type Exports struct {
// Exports exported from the running module.
Exports map[string]any `river:"exports,block"`
}

// NewModuleComponent initializes a new ModuleComponent.
var _ component.Component = (*ModuleComponent)(nil)

// NewModuleComponentV2 initializes a new ModuleComponent.
// Compared to the previous constructor, the export is simply map[string]any instead of the Exports type containing the map.
func NewModuleComponentV2(o component.Options) (*ModuleComponent, error) {
c := &ModuleComponent{
opts: o,
latestLoaderConfigOptions: config.DefaultLoaderConfigOptions(),
}
var err error
c.mod, err = o.ModuleController.NewModule("", func(exports map[string]any) {
c.opts.OnStateChange(exports)
})
return c, err
}

// Deprecated: Use NewModuleComponentV2 instead.
func NewModuleComponent(o component.Options) (*ModuleComponent, error) {
c := &ModuleComponent{
opts: o,
opts: o,
latestLoaderConfigOptions: config.DefaultLoaderConfigOptions(),
}
var err error
c.mod, err = o.ModuleController.NewModule("", func(exports map[string]any) {
Expand All @@ -43,12 +62,12 @@ func NewModuleComponent(o component.Options) (*ModuleComponent, error) {
// LoadFlowSource loads the flow controller with the current component source.
// It will set the component health in addition to return the error so that the consumer can rely on either or both.
// If the content is the same as the last time it was successfully loaded, it will not be reloaded.
func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue string) error {
if reflect.DeepEqual(args, c.getLatestArgs()) && contentValue == c.getLatestContent() {
func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue string, options config.LoaderConfigOptions) error {
if reflect.DeepEqual(args, c.getLatestArgs()) && contentValue == c.getLatestContent() && reflect.DeepEqual(options, c.getLatestLoaderConfigOptions()) {
return nil
}

err := c.mod.LoadConfig([]byte(contentValue), args)
err := c.mod.LoadConfig([]byte(contentValue), args, options)
if err != nil {
c.setHealth(component.Health{
Health: component.HealthTypeUnhealthy,
Expand All @@ -61,6 +80,7 @@ func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue strin

c.setLatestArgs(args)
c.setLatestContent(contentValue)
c.setLatestLoaderConfigOptions(options)
c.setHealth(component.Health{
Health: component.HealthTypeHealthy,
Message: "module content loaded",
Expand All @@ -70,6 +90,17 @@ func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue strin
return nil
}

// Run implements component.Component.
func (c *ModuleComponent) Run(ctx context.Context) error {
<-ctx.Done()
return nil
}

// Update implements component.Component.
func (c *ModuleComponent) Update(_ component.Arguments) error {
return nil
}

// RunFlowController runs the flow controller that all module components start.
func (c *ModuleComponent) RunFlowController(ctx context.Context) {
err := c.mod.Run(ctx)
Expand Down Expand Up @@ -104,6 +135,18 @@ func (c *ModuleComponent) getLatestContent() string {
return c.latestContent
}

func (c *ModuleComponent) setLatestLoaderConfigOptions(options config.LoaderConfigOptions) {
c.mut.Lock()
defer c.mut.Unlock()
c.latestLoaderConfigOptions = options
}

func (c *ModuleComponent) getLatestLoaderConfigOptions() config.LoaderConfigOptions {
c.mut.RLock()
defer c.mut.RUnlock()
return c.latestLoaderConfigOptions
}

func (c *ModuleComponent) setLatestArgs(args map[string]any) {
c.mut.Lock()
defer c.mut.Unlock()
Expand Down
9 changes: 6 additions & 3 deletions component/module/string/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/module"
"github.com/grafana/agent/pkg/flow/config"
"github.com/grafana/river/rivertypes"
)

func init() {
component.Register(component.Registration{
Name: "module.string",
Args: Arguments{},
Name: "module.string",
Args: Arguments{},
//nolint:staticcheck
Exports: module.Exports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
Expand Down Expand Up @@ -42,6 +44,7 @@ var (

// New creates a new module.string component.
func New(o component.Options, args Arguments) (*Component, error) {
//nolint:staticcheck
m, err := module.NewModuleComponent(o)
if err != nil {
return nil, err
Expand All @@ -66,7 +69,7 @@ func (c *Component) Run(ctx context.Context) error {
func (c *Component) Update(args component.Arguments) error {
newArgs := args.(Arguments)

return c.mod.LoadFlowSource(newArgs.Arguments, newArgs.Content.Value)
return c.mod.LoadFlowSource(newArgs.Arguments, newArgs.Content.Value, config.DefaultLoaderConfigOptions())
}

// CurrentHealth implements component.HealthComponent.
Expand Down
3 changes: 2 additions & 1 deletion component/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

"github.com/go-kit/log"
"github.com/grafana/agent/pkg/flow/config"
"github.com/grafana/regexp"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -44,7 +45,7 @@ type Module interface {
// LoadConfig parses River config and loads it into the Module.
// LoadConfig can be called multiple times, and called prior to
// [Module.Run].
LoadConfig(config []byte, args map[string]any) error
LoadConfig(config []byte, args map[string]any, options config.LoaderConfigOptions) error

// Run starts the Module. No components within the Module
// will be run until Run is called.
Expand Down
21 changes: 21 additions & 0 deletions integration-tests/configs/http-module/module.river
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
declare "myModule" {
argument "scrape_endpoint" {}

argument "forward_to" {}

argument "scrape_interval" {
optional = true
default = "1s"
}

prometheus.scrape "scrape_prom_metrics_module_file" {
targets = [
{"__address__" = argument.scrape_endpoint.value},
]
forward_to = argument.forward_to.value
scrape_classic_histograms = true
enable_protobuf_negotiation = true
scrape_interval = argument.scrape_interval.value
scrape_timeout = "500ms"
}
}
9 changes: 8 additions & 1 deletion integration-tests/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,11 @@ services:
dockerfile: ./integration-tests/configs/prom-gen/Dockerfile
context: ..
ports:
- "9001:9001"
- "9001:9001"

http-module:
image: nginx:alpine
ports:
- "8090:80"
volumes:
- ./configs/http-module/module.river:/usr/share/nginx/html/module.river:ro
38 changes: 38 additions & 0 deletions integration-tests/tests/module-file/config.river
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import.file "import_loki" {
filename = "loki.river"
}

import_loki.loki "loki" {}

import.file "import_prom_scrape" {
filename = "prom-scrape.river"
}

declare "target" {
export "output" {
value = "localhost:9001"
}
}

target "promTarget" {}

import_prom_scrape.scrape "promScraper" {
scrape_endpoint = target.promTarget.output
forward_to = [prometheus.remote_write.module_file.receiver]
}

prometheus.remote_write "module_file" {
endpoint {
url = "http://localhost:9009/api/v1/push"
send_native_histograms = true
metadata_config {
send_interval = "1s"
}
queue_config {
max_samples_per_send = 100
}
}
external_labels = {
test_name = "module_file",
}
}
5 changes: 5 additions & 0 deletions integration-tests/tests/module-file/logfile.river
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
declare "getLogFile" {
export "output" {
value = [{__path__ = "logs.txt"}]
}
}
13 changes: 13 additions & 0 deletions integration-tests/tests/module-file/logs.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[2023-10-02 14:25:43] INFO: Starting the web application...
[2023-10-02 14:25:45] DEBUG: Database connection established.
[2023-10-02 14:26:01] INFO: User 'john_doe' logged in.
[2023-10-02 14:26:05] WARNING: User 'john_doe' attempted to access restricted area.
[2023-10-02 14:26:10] ERROR: Failed to retrieve data for item ID: 1234.
[2023-10-02 14:26:15] INFO: User 'john_doe' logged out.
[2023-10-02 14:27:00] INFO: User 'admin' logged in.
[2023-10-02 14:27:05] INFO: Data backup started.
[2023-10-02 14:30:00] INFO: Data backup completed successfully.
[2023-10-02 14:31:23] ERROR: Database connection lost. Retrying in 5 seconds...
[2023-10-02 14:31:28] INFO: Database reconnected.
[2023-10-02 14:32:00] INFO: User 'admin' logged out.
[2023-10-02 14:32:05] INFO: Shutting down the web application...
Loading
Loading