Skip to content

Commit

Permalink
Implement new modules with import and declare keywords.
Browse files Browse the repository at this point in the history
  • Loading branch information
wildum committed Jan 12, 2024
1 parent e6b8ac7 commit a73e2e3
Show file tree
Hide file tree
Showing 60 changed files with 3,357 additions and 168 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,7 @@ jobs:
go-version: "1.21"
- name: Set OTEL Exporter Endpoint
run: echo "OTEL_EXPORTER_ENDPOINT=172.17.0.1:4318" >> $GITHUB_ENV
- name: Set Module HTTP Endpoint
run: echo "MODULE_HTTP_ENDPOINT=http://172.17.0.1:8090/module.river" >> $GITHUB_ENV
- name: Run tests
run: make integration-test
4 changes: 2 additions & 2 deletions cmd/internal/flowmode/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (fr *flowRun) Run(configPath string) error {
if err != nil {
return nil, fmt.Errorf("reading config path %q: %w", configPath, err)
}
if err := f.LoadSource(flowSource, nil); err != nil {
if err := f.LoadSource(flowSource, nil, nil); err != nil {
return flowSource, fmt.Errorf("error during the initial grafana/agent load: %w", err)
}

Expand Down Expand Up @@ -360,7 +360,7 @@ func getEnabledComponentsFunc(f *flow.Flow) func() map[string]interface{} {
components := component.GetAllComponents(f, component.InfoOptions{})
componentNames := map[string]struct{}{}
for _, c := range components {
componentNames[c.Registration.Name] = struct{}{}
componentNames[c.BlockName] = struct{}{}
}
return map[string]interface{}{"enabled-components": maps.Keys(componentNames)}
}
Expand Down
6 changes: 3 additions & 3 deletions component/component_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ type Info struct {
// this component depends on, or is depended on by, respectively.
References, ReferencedBy []string

Registration Registration // Component registration.
Health Health // Current component health.
BlockName string // Component block name.
Health Health // Current component health.

Arguments Arguments // Current arguments value of the component.
Exports Exports // Current exports value of the component.
Expand Down Expand Up @@ -157,7 +157,7 @@ func (info *Info) MarshalJSON() ([]byte, error) {
}

return json.Marshal(&componentDetailJSON{
Name: info.Registration.Name,
Name: info.BlockName,
Type: "block",
ModuleID: info.ID.ModuleID,
LocalID: info.ID.LocalID,
Expand Down
6 changes: 3 additions & 3 deletions component/module/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var (

// New creates a new module.file component.
func New(o component.Options, args Arguments) (*Component, error) {
m, err := module.NewModuleComponent(o)
m, err := module.NewModuleComponentDeprecated(o)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -88,7 +88,7 @@ func (c *Component) newManagedLocalComponent(o component.Options) (*file.Compone

if !c.inUpdate.Load() && c.isCreated.Load() {
// Any errors found here are reported via component health
_ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value)
_ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value, nil)
}
}

Expand Down Expand Up @@ -135,7 +135,7 @@ func (c *Component) Update(args component.Arguments) error {

// Force a content load here and bubble up any error. This will catch problems
// on initial load.
return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value)
return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value, nil)
}

// CurrentHealth implements component.HealthComponent.
Expand Down
6 changes: 3 additions & 3 deletions component/module/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/go-kit/log"
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/module"
"github.com/grafana/agent/component/module/git/internal/vcs"
"github.com/grafana/agent/pkg/flow/logging/level"
vcs "github.com/grafana/agent/pkg/util/git"
)

func init() {
Expand Down Expand Up @@ -74,7 +74,7 @@ var (

// New creates a new module.git component.
func New(o component.Options, args Arguments) (*Component, error) {
m, err := module.NewModuleComponent(o)
m, err := module.NewModuleComponentDeprecated(o)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -239,7 +239,7 @@ func (c *Component) pollFile(ctx context.Context, args Arguments) error {
return err
}

return c.mod.LoadFlowSource(args.Arguments, string(bb))
return c.mod.LoadFlowSource(args.Arguments, string(bb), nil)
}

// CurrentHealth implements component.HealthComponent.
Expand Down
6 changes: 3 additions & 3 deletions component/module/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ var (

// New creates a new module.http component.
func New(o component.Options, args Arguments) (*Component, error) {
m, err := module.NewModuleComponent(o)
m, err := module.NewModuleComponentDeprecated(o)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func (c *Component) newManagedLocalComponent(o component.Options) (*remote_http.

if !c.inUpdate.Load() && c.isCreated.Load() {
// Any errors found here are reported via component health
_ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value)
_ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value, nil)
}
}

Expand Down Expand Up @@ -134,7 +134,7 @@ func (c *Component) Update(args component.Arguments) error {

// Force a content load here and bubble up any error. This will catch problems
// on initial load.
return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value)
return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value, nil)
}

// CurrentHealth implements component.HealthComponent.
Expand Down
41 changes: 34 additions & 7 deletions component/module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,34 @@ type ModuleComponent struct {
opts component.Options
mod component.Module

mut sync.RWMutex
health component.Health
latestContent string
latestArgs map[string]any
mut sync.RWMutex
health component.Health
latestContent string
latestArgs map[string]any
latestParentModuleDefinitions map[string]string
}

// Exports holds values which are exported from the run module.
// This export type is deprecated.
type Exports struct {
// Exports exported from the running module.
Exports map[string]any `river:"exports,block"`
}

// NewModuleComponent initializes a new ModuleComponent.
func NewModuleComponent(o component.Options) (*ModuleComponent, error) {
c := &ModuleComponent{
opts: o,
}
var err error
c.mod, err = o.ModuleController.NewModule("", func(exports map[string]any) {
c.opts.OnStateChange(exports)
})
return c, err
}

// TODO: Remove when getting rid of old modules
func NewModuleComponentDeprecated(o component.Options) (*ModuleComponent, error) {
c := &ModuleComponent{
opts: o,
}
Expand All @@ -43,12 +57,12 @@ func NewModuleComponent(o component.Options) (*ModuleComponent, error) {
// LoadFlowSource loads the flow controller with the current component source.
// It will set the component health in addition to return the error so that the consumer can rely on either or both.
// If the content is the same as the last time it was successfully loaded, it will not be reloaded.
func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue string) error {
if reflect.DeepEqual(args, c.getLatestArgs()) && contentValue == c.getLatestContent() {
func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue string, parentModuleDefinitions map[string]string) error {
if reflect.DeepEqual(args, c.getLatestArgs()) && contentValue == c.getLatestContent() && reflect.DeepEqual(args, c.getLatestParentModuleDefinitions()) {
return nil
}

err := c.mod.LoadConfig([]byte(contentValue), args)
err := c.mod.LoadConfig([]byte(contentValue), args, parentModuleDefinitions)
if err != nil {
c.setHealth(component.Health{
Health: component.HealthTypeUnhealthy,
Expand All @@ -61,6 +75,7 @@ func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue strin

c.setLatestArgs(args)
c.setLatestContent(contentValue)
c.setLatestParentModuleDefinitions(parentModuleDefinitions)
c.setHealth(component.Health{
Health: component.HealthTypeHealthy,
Message: "module content loaded",
Expand Down Expand Up @@ -104,6 +119,18 @@ func (c *ModuleComponent) getLatestContent() string {
return c.latestContent
}

func (c *ModuleComponent) setLatestParentModuleDefinitions(parentModuleDefinitions map[string]string) {
c.mut.Lock()
defer c.mut.Unlock()
c.latestParentModuleDefinitions = parentModuleDefinitions
}

func (c *ModuleComponent) getLatestParentModuleDefinitions() map[string]string {
c.mut.RLock()
defer c.mut.RUnlock()
return c.latestParentModuleDefinitions
}

func (c *ModuleComponent) setLatestArgs(args map[string]any) {
c.mut.Lock()
defer c.mut.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions component/module/string/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var (

// New creates a new module.string component.
func New(o component.Options, args Arguments) (*Component, error) {
m, err := module.NewModuleComponent(o)
m, err := module.NewModuleComponentDeprecated(o)
if err != nil {
return nil, err
}
Expand All @@ -66,7 +66,7 @@ func (c *Component) Run(ctx context.Context) error {
func (c *Component) Update(args component.Arguments) error {
newArgs := args.(Arguments)

return c.mod.LoadFlowSource(newArgs.Arguments, newArgs.Content.Value)
return c.mod.LoadFlowSource(newArgs.Arguments, newArgs.Content.Value, nil)
}

// CurrentHealth implements component.HealthComponent.
Expand Down
2 changes: 1 addition & 1 deletion component/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Module interface {
// LoadConfig parses River config and loads it into the Module.
// LoadConfig can be called multiple times, and called prior to
// [Module.Run].
LoadConfig(config []byte, args map[string]any) error
LoadConfig(config []byte, args map[string]any, moduleDefinitions map[string]string) error

// Run starts the Module. No components within the Module
// will be run until Run is called.
Expand Down
2 changes: 1 addition & 1 deletion converter/internal/test_common/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func attemptLoadingFlowConfig(t *testing.T, river []byte) {
labelstore.New(nil, prometheus.DefaultRegisterer),
},
})
err = f.LoadSource(cfg, nil)
err = f.LoadSource(cfg, nil, nil)

// Many components will fail to build as e.g. the cert files are missing, so we ignore these errors.
// This is not ideal, but we still validate for other potential issues.
Expand Down
21 changes: 21 additions & 0 deletions integration-tests/configs/http-module/module.river
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
declare "myModule" {
argument "scrape_endpoint" {}

argument "forward_to" {}

argument "scrape_interval" {
optional = true
default = "1s"
}

prometheus.scrape "scrape_prom_metrics_module_file" {
targets = [
{"__address__" = argument.scrape_endpoint.value},
]
forward_to = argument.forward_to.value
scrape_classic_histograms = true
enable_protobuf_negotiation = true
scrape_interval = argument.scrape_interval.value
scrape_timeout = "500ms"
}
}
9 changes: 8 additions & 1 deletion integration-tests/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,11 @@ services:
dockerfile: ./integration-tests/configs/prom-gen/Dockerfile
context: ..
ports:
- "9001:9001"
- "9001:9001"

http-module:
image: nginx:alpine
ports:
- "8090:80"
volumes:
- ./configs/http-module/module.river:/usr/share/nginx/html/module.river:ro
38 changes: 38 additions & 0 deletions integration-tests/tests/module-file/config.river
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import.file "import_loki" {
filename = "loki.river"
}

import_loki.loki "loki" {}

import.file "import_prom_scrape" {
filename = "prom-scrape.river"
}

declare "target" {
export "output" {
value = "localhost:9001"
}
}

target "promTarget" {}

import_prom_scrape.scrape "promScraper" {
scrape_endpoint = target.promTarget.output
forward_to = [prometheus.remote_write.module_file.receiver]
}

prometheus.remote_write "module_file" {
endpoint {
url = "http://localhost:9009/api/v1/push"
send_native_histograms = true
metadata_config {
send_interval = "1s"
}
queue_config {
max_samples_per_send = 100
}
}
external_labels = {
test_name = "module_file",
}
}
5 changes: 5 additions & 0 deletions integration-tests/tests/module-file/logfile.river
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
declare "getLogFile" {
export "output" {
value = [{__path__ = "logs.txt"}]
}
}
13 changes: 13 additions & 0 deletions integration-tests/tests/module-file/logs.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[2023-10-02 14:25:43] INFO: Starting the web application...
[2023-10-02 14:25:45] DEBUG: Database connection established.
[2023-10-02 14:26:01] INFO: User 'john_doe' logged in.
[2023-10-02 14:26:05] WARNING: User 'john_doe' attempted to access restricted area.
[2023-10-02 14:26:10] ERROR: Failed to retrieve data for item ID: 1234.
[2023-10-02 14:26:15] INFO: User 'john_doe' logged out.
[2023-10-02 14:27:00] INFO: User 'admin' logged in.
[2023-10-02 14:27:05] INFO: Data backup started.
[2023-10-02 14:30:00] INFO: Data backup completed successfully.
[2023-10-02 14:31:23] ERROR: Database connection lost. Retrying in 5 seconds...
[2023-10-02 14:31:28] INFO: Database reconnected.
[2023-10-02 14:32:00] INFO: User 'admin' logged out.
[2023-10-02 14:32:05] INFO: Shutting down the web application...
17 changes: 17 additions & 0 deletions integration-tests/tests/module-file/loki.river
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import.file "logTarget" {
filename = "logfile.river"
}

import.file "write" {
filename = "loki_write.river"
}

declare "loki" {
logTarget.getLogFile "logFile" {}
loki.source.file "test" {
targets = logTarget.getLogFile.logFile.output
forward_to = [write.loki_write.default.receiver]
}
write.loki_write "default" {}
}

13 changes: 13 additions & 0 deletions integration-tests/tests/module-file/loki_write.river
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
declare "loki_write" {
loki.write "test" {
endpoint {
url = "http://localhost:3100/loki/api/v1/push"
}
external_labels = {
test_name = "module_file",
}
}
export "receiver" {
value = loki.write.test.receiver
}
}
Loading

0 comments on commit a73e2e3

Please sign in to comment.