Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import statement for module 2.0 experiment #5890

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions component/all/all.go
Original file line number Diff line number Diff line change
@@ -54,6 +54,7 @@ import (
_ "github.com/grafana/agent/component/loki/source/windowsevent" // Import loki.source.windowsevent
_ "github.com/grafana/agent/component/loki/write" // Import loki.write
_ "github.com/grafana/agent/component/mimir/rules/kubernetes" // Import mimir.rules.kubernetes
_ "github.com/grafana/agent/component/module" // Import module.module_component
_ "github.com/grafana/agent/component/module/file" // Import module.file
_ "github.com/grafana/agent/component/module/git" // Import module.git
_ "github.com/grafana/agent/component/module/http" // Import module.http
119 changes: 119 additions & 0 deletions component/module/module_component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package module

import (
"context"
"sync"
"sync/atomic"

"github.com/grafana/agent/component"
)

func init() {
component.Register(component.Registration{
Name: "module_component",
Args: Arguments{},
Exports: Exports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
},
})
}

// Arguments holds values which are used to configure the module.
type Arguments = map[string]any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this trick also work for exports? Right now I think you'd need to get exports via namespace.comp_name.exports.something instead of namespace.compo_name.something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice one, yes that should work. I tried it with unit tests and it worked :)


// Component implements the module.file component.
type Component struct {
opts component.Options
mod *ModuleComponent

mut sync.RWMutex
args Arguments
content string
updatedOnce atomic.Bool
}

var (
_ component.Component = (*Component)(nil)
_ component.HealthComponent = (*Component)(nil)
)

// New creates a new module.file component.
func New(o component.Options, args Arguments) (*Component, error) {
m, err := NewModuleComponent(o)
if err != nil {
return nil, err
}

c := &Component{
opts: o,
mod: m,
args: args,
}
// we don't update on create because we don't have the content yet
return c, nil
}

// Run implements component.Component.
func (c *Component) Run(ctx context.Context) error {
c.mod.RunFlowController(ctx)
return nil
}

// Update implements component.Component.
func (c *Component) Update(args component.Arguments) error {
newArgs := args.(Arguments)
c.setArgs(newArgs)
c.updatedOnce.Store(true)
return c.reload()
}

// UpdateContent reloads the module with a new config
func (c *Component) UpdateContent(content string) error {
if content != c.getContent() {
c.setContent(content)
return c.reload()
}
return nil
}

func (c *Component) reload() error {
if c.getContent() == "" || !c.updatedOnce.Load() {
return nil // the module is not yet ready
}
return c.mod.LoadFlowSource(c.getArgs(), c.getContent())
}

// CurrentHealth implements component.HealthComponent.
func (c *Component) CurrentHealth() component.Health {
return c.mod.CurrentHealth()
}

// getArgs is a goroutine safe way to get args
func (c *Component) getArgs() Arguments {
c.mut.RLock()
defer c.mut.RUnlock()
return c.args
}

// setArgs is a goroutine safe way to set args
func (c *Component) setArgs(args Arguments) {
c.mut.Lock()
c.args = args
c.mut.Unlock()
}

// getContent is a goroutine safe way to get content
func (c *Component) getContent() string {
c.mut.RLock()
defer c.mut.RUnlock()
return c.content
}

// setContent is a goroutine safe way to set content
func (c *Component) setContent(content string) {
c.mut.Lock()
c.content = content
c.mut.Unlock()
}
28 changes: 28 additions & 0 deletions integration-tests/tests/scrape-prom-metrics-modules/config.river
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
logging {
level = "debug"
}

import.file "scrape_module" {
filename = "module.river"
}

scrape_module "scrape_prom_metrics_modules" {
scrape_endpoint = "localhost:9001"
forward_to = [prometheus.remote_write.scrape_prom_metrics_modules.receiver]
}

prometheus.remote_write "scrape_prom_metrics_modules" {
endpoint {
url = "http://localhost:9009/api/v1/push"
send_native_histograms = true
metadata_config {
send_interval = "1s"
}
queue_config {
max_samples_per_send = 100
}
}
external_labels = {
test_name = "scrape_prom_metrics_modules",
}
}
19 changes: 19 additions & 0 deletions integration-tests/tests/scrape-prom-metrics-modules/module.river
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
argument "scrape_endpoint" {}

argument "forward_to" {}

argument "scrape_interval" {
optional = true
default = "1s"
}

prometheus.scrape "scrape_prom_metrics_modules" {
targets = [
{"__address__" = argument.scrape_endpoint.value},
]
forward_to = argument.forward_to.value
scrape_classic_histograms = true
enable_protobuf_negotiation = true
scrape_interval = argument.scrape_interval.value
scrape_timeout = "500ms"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"fmt"
"strconv"
"testing"

"github.com/grafana/agent/integration-tests/common"
"github.com/stretchr/testify/assert"
)

const promURL = "http://localhost:9009/prometheus/api/v1/query?query="

func metricQuery(metricName string) string {
return fmt.Sprintf("%s%s{test_name='scrape_prom_metrics_modules'}", promURL, metricName)
}

func TestScrapePromMetricsModules(t *testing.T) {
metrics := []string{
// TODO: better differentiate these metric types?
"golang_counter",
"golang_gauge",
"golang_histogram_bucket",
"golang_summary",
"golang_native_histogram",
}

for _, metric := range metrics {
metric := metric
t.Run(metric, func(t *testing.T) {
t.Parallel()
if metric == "golang_native_histogram" {
assertHistogramData(t, metricQuery(metric), metric)
} else {
assertMetricData(t, metricQuery(metric), metric)
}
})
}
}

func assertHistogramData(t *testing.T, query string, expectedMetric string) {
var metricResponse common.MetricResponse
assert.EventuallyWithT(t, func(c *assert.CollectT) {
err := common.FetchDataFromURL(query, &metricResponse)
assert.NoError(c, err)
if assert.NotEmpty(c, metricResponse.Data.Result) {
assert.Equal(c, metricResponse.Data.Result[0].Metric.Name, expectedMetric)
assert.Equal(c, metricResponse.Data.Result[0].Metric.TestName, "scrape_prom_metrics_modules")
if assert.NotNil(c, metricResponse.Data.Result[0].Histogram) {
histogram := metricResponse.Data.Result[0].Histogram
if assert.NotEmpty(c, histogram.Data.Count) {
count, _ := strconv.Atoi(histogram.Data.Count)
assert.Greater(c, count, 10, "Count should be at some point greater than 10.")
}
if assert.NotEmpty(c, histogram.Data.Sum) {
sum, _ := strconv.ParseFloat(histogram.Data.Sum, 64)
assert.Greater(c, sum, 10., "Sum should be at some point greater than 10.")
}
assert.NotEmpty(c, histogram.Data.Buckets)
assert.Nil(c, metricResponse.Data.Result[0].Value)
}
}
}, common.DefaultTimeout, common.DefaultRetryInterval, "Histogram data did not satisfy the conditions within the time limit")
}

func assertMetricData(t *testing.T, query, expectedMetric string) {
var metricResponse common.MetricResponse
assert.EventuallyWithT(t, func(c *assert.CollectT) {
err := common.FetchDataFromURL(query, &metricResponse)
assert.NoError(c, err)
if assert.NotEmpty(c, metricResponse.Data.Result) {
assert.Equal(c, metricResponse.Data.Result[0].Metric.Name, expectedMetric)
assert.Equal(c, metricResponse.Data.Result[0].Metric.TestName, "scrape_prom_metrics_modules")
assert.NotEmpty(c, metricResponse.Data.Result[0].Value.Value)
assert.Nil(c, metricResponse.Data.Result[0].Histogram)
}
}, common.DefaultTimeout, common.DefaultRetryInterval, "Data did not satisfy the conditions within the time limit")
}
5 changes: 5 additions & 0 deletions pkg/flow/flow.go
Original file line number Diff line number Diff line change
@@ -247,13 +247,18 @@ func (f *Flow) Run(ctx context.Context) {
var (
components = f.loader.Components()
services = f.loader.Services()
imports = f.loader.Imports()

runnables = make([]controller.RunnableNode, 0, len(components)+len(services))
)
for _, c := range components {
runnables = append(runnables, c)
}

for _, i := range imports {
runnables = append(runnables, i)
}

// Only the root controller should run services, since modules share the
// same service instance as the root.
if !f.opts.IsModule {
Loading