From 57e22d147cfba7b4e994d3c1bd18654cdc097c4b Mon Sep 17 00:00:00 2001 From: William Dumont Date: Tue, 7 Jan 2025 17:59:30 +0100 Subject: [PATCH] rework foreach txtar tests --- internal/runtime/foreach_test.go | 17 +--- .../runtime/internal/testcomponents/count.go | 15 +-- .../runtime/internal/testcomponents/pulse.go | 91 +++++++++++++++++ .../internal/testcomponents/sumation1.go | 66 ------------- .../internal/testcomponents/sumation2.go | 99 ------------------- .../testcomponents/summation_receiver.go | 96 ++++++++++++++++++ .../runtime/testdata/foreach/foreach_1.txtar | 14 +-- .../runtime/testdata/foreach/foreach_2.txtar | 19 ++++ 8 files changed, 227 insertions(+), 190 deletions(-) create mode 100644 internal/runtime/internal/testcomponents/pulse.go delete mode 100644 internal/runtime/internal/testcomponents/sumation1.go delete mode 100644 internal/runtime/internal/testcomponents/sumation2.go create mode 100644 internal/runtime/internal/testcomponents/summation_receiver.go create mode 100644 internal/runtime/testdata/foreach/foreach_2.txtar diff --git a/internal/runtime/foreach_test.go b/internal/runtime/foreach_test.go index e29d733f4f..1df13c5081 100644 --- a/internal/runtime/foreach_test.go +++ b/internal/runtime/foreach_test.go @@ -3,15 +3,13 @@ package runtime_test import ( "context" "path/filepath" - "strings" "sync" "testing" "time" + "github.com/grafana/alloy/internal/runtime/internal/testcomponents" "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "k8s.io/component-base/metrics/testutil" ) // TODO: Test a foreach inside a foreach. @@ -47,16 +45,9 @@ func testConfigForEach(t *testing.T, reg *prometheus.Registry, config string, re ctrl.Run(ctx) }() - // Check for initial condition - require.EventuallyWithT(t, func(c *assert.CollectT) { - expectedMetrics := ` -# HELP testcomponents_summation2 Summation of all integers received -# TYPE testcomponents_summation2 counter -testcomponents_summation2{component_id="testcomponents.summation2.final",component_path="/"} 2 -` - if err := testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "testcomponents_summation2_total"); err != nil { - c.Errorf("mismatch metrics: %v", err) - } + require.Eventually(t, func() bool { + export := getExport[testcomponents.SummationReceiverExports](t, ctrl, "", "testcomponents.summation_receiver.sum") + return export.Sum == 10 }, 3*time.Second, 10*time.Millisecond) // if update != nil { diff --git a/internal/runtime/internal/testcomponents/count.go b/internal/runtime/internal/testcomponents/count.go index d05070f313..ec778d88c7 100644 --- a/internal/runtime/internal/testcomponents/count.go +++ b/internal/runtime/internal/testcomponents/count.go @@ -29,6 +29,7 @@ func init() { type CountConfig struct { Frequency time.Duration `alloy:"frequency,attr"` Max int `alloy:"max,attr"` + ForwardTo []IntReceiver `alloy:"forward_to,attr,optional"` } type CountExports struct { @@ -63,18 +64,20 @@ func (t *Count) Run(ctx context.Context) error { return nil case <-time.After(t.getNextCount()): t.cfgMut.Lock() - maxCount := t.cfg.Max - t.cfgMut.Unlock() - currentCount := t.count.Load() - if maxCount == 0 || currentCount < int32(maxCount) { + if t.cfg.Max == 0 || currentCount < int32(t.cfg.Max) { if t.count.CompareAndSwap(currentCount, currentCount+1) { - level.Info(t.log).Log("msg", "incremented count", "count", currentCount+1) - t.opts.OnStateChange(CountExports{Count: int(currentCount + 1)}) + newCount := int(currentCount + 1) + level.Info(t.log).Log("msg", "incremented count", "count", newCount) + t.opts.OnStateChange(CountExports{Count: newCount}) + for _, r := range t.cfg.ForwardTo { + r.ReceiveInt(newCount) + } } else { level.Info(t.log).Log("msg", "failed to increment count", "count", currentCount) } } + t.cfgMut.Unlock() } } } diff --git a/internal/runtime/internal/testcomponents/pulse.go b/internal/runtime/internal/testcomponents/pulse.go new file mode 100644 index 0000000000..df61a4d94f --- /dev/null +++ b/internal/runtime/internal/testcomponents/pulse.go @@ -0,0 +1,91 @@ +package testcomponents + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/logging/level" +) + +func init() { + component.Register(component.Registration{ + Name: "testcomponents.pulse", + Stability: featuregate.StabilityPublicPreview, + Args: PulseConfig{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return NewPulse(opts, args.(PulseConfig)) + }, + }) +} + +type PulseConfig struct { + Frequency time.Duration `alloy:"frequency,attr"` + Max int `alloy:"max,attr"` + ForwardTo []IntReceiver `alloy:"forward_to,attr,optional"` +} + +type Pulse struct { + opts component.Options + log log.Logger + + cfgMut sync.Mutex + cfg PulseConfig + count int +} + +func NewPulse(o component.Options, cfg PulseConfig) (*Pulse, error) { + t := &Pulse{opts: o, log: o.Logger} + if err := t.Update(cfg); err != nil { + return nil, err + } + return t, nil +} + +var ( + _ component.Component = (*Pulse)(nil) +) + +func (p *Pulse) Run(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case <-time.After(p.getNextPulse()): + p.cfgMut.Lock() + if p.cfg.Max == 0 || p.count < p.cfg.Max { + for _, r := range p.cfg.ForwardTo { + r.ReceiveInt(1) + } + p.count++ + } + p.cfgMut.Unlock() + } + } +} + +func (t *Pulse) getNextPulse() time.Duration { + t.cfgMut.Lock() + defer t.cfgMut.Unlock() + return t.cfg.Frequency +} + +// Update implements Component. +func (t *Pulse) Update(args component.Arguments) error { + t.cfgMut.Lock() + defer t.cfgMut.Unlock() + + cfg := args.(PulseConfig) + if cfg.Frequency == 0 { + return fmt.Errorf("frequency must not be 0") + } + + level.Info(t.log).Log("msg", "setting count frequency", "freq", cfg.Frequency) + t.cfg = cfg + return nil +} diff --git a/internal/runtime/internal/testcomponents/sumation1.go b/internal/runtime/internal/testcomponents/sumation1.go deleted file mode 100644 index 63af56370e..0000000000 --- a/internal/runtime/internal/testcomponents/sumation1.go +++ /dev/null @@ -1,66 +0,0 @@ -package testcomponents - -import ( - "context" - - "github.com/go-kit/log" - "github.com/grafana/alloy/internal/component" - "github.com/grafana/alloy/internal/featuregate" -) - -func init() { - component.Register(component.Registration{ - Name: "testcomponents.summation1", - Stability: featuregate.StabilityPublicPreview, - Args: SummationConfig_Entry{}, - Exports: SummationExports_Entry{}, - - Build: func(opts component.Options, args component.Arguments) (component.Component, error) { - return NewSummation_Entry(opts, args.(SummationConfig_Entry)) - }, - }) -} - -// Accepts a single integer input and forwards it to all the components listed in forward_to. -type SummationConfig_Entry struct { - Input int `alloy:"input,attr"` - ForwardTo []IntReceiver `alloy:"forward_to,attr"` -} - -type SummationExports_Entry struct { -} - -type Summation_Entry struct { - opts component.Options - log log.Logger - cfg SummationConfig_Entry -} - -// NewSummation creates a new summation component. -func NewSummation_Entry(o component.Options, cfg SummationConfig_Entry) (*Summation_Entry, error) { - return &Summation_Entry{ - opts: o, - log: o.Logger, - cfg: cfg, - }, nil -} - -var ( - _ component.Component = (*Summation_Entry)(nil) -) - -// Run implements Component. -func (t *Summation_Entry) Run(ctx context.Context) error { - for _, r := range t.cfg.ForwardTo { - r.ReceiveInt(t.cfg.Input) - } - - <-ctx.Done() - return nil -} - -// Update implements Component. -func (t *Summation_Entry) Update(args component.Arguments) error { - // TODO: Implement this? - return nil -} diff --git a/internal/runtime/internal/testcomponents/sumation2.go b/internal/runtime/internal/testcomponents/sumation2.go deleted file mode 100644 index 2c6e8ad2eb..0000000000 --- a/internal/runtime/internal/testcomponents/sumation2.go +++ /dev/null @@ -1,99 +0,0 @@ -package testcomponents - -import ( - "context" - - "github.com/go-kit/log" - "github.com/grafana/alloy/internal/component" - "github.com/grafana/alloy/internal/featuregate" - "github.com/prometheus/client_golang/prometheus" -) - -func init() { - component.Register(component.Registration{ - Name: "testcomponents.summation2", - Stability: featuregate.StabilityPublicPreview, - Args: SummationConfig_2{}, - Exports: SummationExports_2{}, - - Build: func(opts component.Options, args component.Arguments) (component.Component, error) { - return NewSummation_2(opts) - }, - }) -} - -type IntReceiver interface { - ReceiveInt(int) -} - -type IntReceiverImpl struct { - incrementSum func(int) -} - -func (r IntReceiverImpl) ReceiveInt(i int) { - r.incrementSum(i) -} - -type SummationConfig_2 struct { -} - -type SummationExports_2 struct { - Receiver IntReceiver `alloy:"receiver,attr"` -} - -type Summation_2 struct { - opts component.Options - log log.Logger - - reg prometheus.Registerer - counter prometheus.Counter - receiver IntReceiver -} - -// NewSummation creates a new summation component. -func NewSummation_2(o component.Options) (*Summation_2, error) { - counter := prometheus.NewCounter(prometheus.CounterOpts{ - Name: "testcomponents_summation2", - Help: "Summation of all integers received", - }) - - recv := IntReceiverImpl{ - incrementSum: func(i int) { - counter.Add(float64(i)) - }, - } - - t := &Summation_2{ - opts: o, - log: o.Logger, - receiver: recv, - reg: o.Registerer, - counter: counter, - } - - o.OnStateChange(SummationExports_2{ - Receiver: t.receiver, - }) - - return t, nil -} - -var ( - _ component.Component = (*Summation)(nil) -) - -// Run implements Component. -func (t *Summation_2) Run(ctx context.Context) error { - if err := t.reg.Register(t.counter); err != nil { - return err - } - defer t.reg.Unregister(t.counter) - - <-ctx.Done() - return nil -} - -// Update implements Component. -func (t *Summation_2) Update(args component.Arguments) error { - return nil -} diff --git a/internal/runtime/internal/testcomponents/summation_receiver.go b/internal/runtime/internal/testcomponents/summation_receiver.go new file mode 100644 index 0000000000..23eed69a28 --- /dev/null +++ b/internal/runtime/internal/testcomponents/summation_receiver.go @@ -0,0 +1,96 @@ +package testcomponents + +import ( + "context" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/featuregate" + "go.uber.org/atomic" +) + +func init() { + component.Register(component.Registration{ + Name: "testcomponents.summation_receiver", + Stability: featuregate.StabilityPublicPreview, + Args: SummationReceiverConfig{}, + Exports: SummationReceiverExports{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return NewSummationReceiver(opts, args.(SummationReceiverConfig)) + }, + }) +} + +type SummationReceiverConfig struct { +} + +type IntReceiver interface { + ReceiveInt(int) +} + +type IntReceiverImpl struct { + incrementSum func(int) +} + +func (r IntReceiverImpl) ReceiveInt(i int) { + r.incrementSum(i) +} + +type SummationReceiverExports struct { + Sum int `alloy:"sum,attr"` + Receiver IntReceiver `alloy:"receiver,attr"` +} + +type SummationReceiver struct { + opts component.Options + log log.Logger + + mut sync.RWMutex + sum atomic.Int32 + receiver IntReceiver +} + +// NewSummationReceiver creates a new summation component. +func NewSummationReceiver(o component.Options, cfg SummationReceiverConfig) (*SummationReceiver, error) { + s := &SummationReceiver{opts: o, log: o.Logger} + s.receiver = IntReceiverImpl{ + incrementSum: func(i int) { + s.sum.Add(int32(i)) + }, + } + + o.OnStateChange(SummationReceiverExports{ + Receiver: s.receiver, + }) + + return s, nil +} + +var ( + _ component.Component = (*SummationReceiver)(nil) +) + +// Run implements Component. +func (s *SummationReceiver) Run(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case <-time.After(10 * time.Millisecond): + s.mut.RLock() + s.opts.OnStateChange(SummationReceiverExports{ + Sum: int(s.sum.Load()), + Receiver: s.receiver, + }) + s.mut.RUnlock() + } + } +} + +// Update implements Component. +func (t *SummationReceiver) Update(args component.Arguments) error { + return nil +} diff --git a/internal/runtime/testdata/foreach/foreach_1.txtar b/internal/runtime/testdata/foreach/foreach_1.txtar index 647d2464d3..4ca243844e 100644 --- a/internal/runtime/testdata/foreach/foreach_1.txtar +++ b/internal/runtime/testdata/foreach/foreach_1.txtar @@ -1,17 +1,19 @@ +Foreach with only one item. The pulse will send "1" to the receiver of the summation component until it reaches 10. + -- main.alloy -- foreach "testForeach" { - collection = [2] + collection = [10] var = "num" template { - // Similar to testcomponents.summation, but with a "forward_to" - testcomponents.summation1 "sum" { - input = num - forward_to = [testcomponents.summation2.final.receiver] + testcomponents.pulse "pt" { + max = num + frequency = "10ms" + forward_to = [testcomponents.summation_receiver.sum.receiver] } } } // Similar to testcomponents.summation, but with a "receiver" export -testcomponents.summation2 "final" { +testcomponents.summation_receiver "sum" { } diff --git a/internal/runtime/testdata/foreach/foreach_2.txtar b/internal/runtime/testdata/foreach/foreach_2.txtar new file mode 100644 index 0000000000..13a45e56b7 --- /dev/null +++ b/internal/runtime/testdata/foreach/foreach_2.txtar @@ -0,0 +1,19 @@ +Foreach with two items. Both pulse components will send "1" till they both reach 5, adding to 10 in the summation component. + +-- main.alloy -- +foreach "testForeach" { + collection = [5, 5] + var = "num" + + template { + testcomponents.pulse "pt" { + max = num + frequency = "10ms" + forward_to = [testcomponents.summation_receiver.sum.receiver] + } + } +} + +// Similar to testcomponents.summation, but with a "receiver" export +testcomponents.summation_receiver "sum" { +}