Skip to content

Commit

Permalink
rework foreach txtar tests
Browse files Browse the repository at this point in the history
  • Loading branch information
wildum committed Jan 7, 2025
1 parent 8217e86 commit 57e22d1
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 190 deletions.
17 changes: 4 additions & 13 deletions internal/runtime/foreach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package runtime_test
import (
"context"
"path/filepath"
"strings"
"sync"
"testing"
"time"

"github.com/grafana/alloy/internal/runtime/internal/testcomponents"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/component-base/metrics/testutil"
)

// TODO: Test a foreach inside a foreach.
Expand Down Expand Up @@ -47,16 +45,9 @@ func testConfigForEach(t *testing.T, reg *prometheus.Registry, config string, re
ctrl.Run(ctx)
}()

// Check for initial condition
require.EventuallyWithT(t, func(c *assert.CollectT) {
expectedMetrics := `
# HELP testcomponents_summation2 Summation of all integers received
# TYPE testcomponents_summation2 counter
testcomponents_summation2{component_id="testcomponents.summation2.final",component_path="/"} 2
`
if err := testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "testcomponents_summation2_total"); err != nil {
c.Errorf("mismatch metrics: %v", err)
}
require.Eventually(t, func() bool {
export := getExport[testcomponents.SummationReceiverExports](t, ctrl, "", "testcomponents.summation_receiver.sum")
return export.Sum == 10
}, 3*time.Second, 10*time.Millisecond)

// if update != nil {
Expand Down
15 changes: 9 additions & 6 deletions internal/runtime/internal/testcomponents/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func init() {
type CountConfig struct {
Frequency time.Duration `alloy:"frequency,attr"`
Max int `alloy:"max,attr"`
ForwardTo []IntReceiver `alloy:"forward_to,attr,optional"`
}

type CountExports struct {
Expand Down Expand Up @@ -63,18 +64,20 @@ func (t *Count) Run(ctx context.Context) error {
return nil
case <-time.After(t.getNextCount()):
t.cfgMut.Lock()
maxCount := t.cfg.Max
t.cfgMut.Unlock()

currentCount := t.count.Load()
if maxCount == 0 || currentCount < int32(maxCount) {
if t.cfg.Max == 0 || currentCount < int32(t.cfg.Max) {
if t.count.CompareAndSwap(currentCount, currentCount+1) {
level.Info(t.log).Log("msg", "incremented count", "count", currentCount+1)
t.opts.OnStateChange(CountExports{Count: int(currentCount + 1)})
newCount := int(currentCount + 1)
level.Info(t.log).Log("msg", "incremented count", "count", newCount)
t.opts.OnStateChange(CountExports{Count: newCount})
for _, r := range t.cfg.ForwardTo {
r.ReceiveInt(newCount)
}
} else {
level.Info(t.log).Log("msg", "failed to increment count", "count", currentCount)
}
}
t.cfgMut.Unlock()
}
}
}
Expand Down
91 changes: 91 additions & 0 deletions internal/runtime/internal/testcomponents/pulse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package testcomponents

import (
"context"
"fmt"
"sync"
"time"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging/level"
)

func init() {
component.Register(component.Registration{
Name: "testcomponents.pulse",
Stability: featuregate.StabilityPublicPreview,
Args: PulseConfig{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return NewPulse(opts, args.(PulseConfig))
},
})
}

type PulseConfig struct {
Frequency time.Duration `alloy:"frequency,attr"`
Max int `alloy:"max,attr"`
ForwardTo []IntReceiver `alloy:"forward_to,attr,optional"`
}

type Pulse struct {
opts component.Options
log log.Logger

cfgMut sync.Mutex
cfg PulseConfig
count int
}

func NewPulse(o component.Options, cfg PulseConfig) (*Pulse, error) {
t := &Pulse{opts: o, log: o.Logger}
if err := t.Update(cfg); err != nil {
return nil, err
}
return t, nil
}

var (
_ component.Component = (*Pulse)(nil)
)

func (p *Pulse) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case <-time.After(p.getNextPulse()):
p.cfgMut.Lock()
if p.cfg.Max == 0 || p.count < p.cfg.Max {
for _, r := range p.cfg.ForwardTo {
r.ReceiveInt(1)
}
p.count++
}
p.cfgMut.Unlock()
}
}
}

func (t *Pulse) getNextPulse() time.Duration {
t.cfgMut.Lock()
defer t.cfgMut.Unlock()
return t.cfg.Frequency
}

// Update implements Component.
func (t *Pulse) Update(args component.Arguments) error {
t.cfgMut.Lock()
defer t.cfgMut.Unlock()

cfg := args.(PulseConfig)
if cfg.Frequency == 0 {
return fmt.Errorf("frequency must not be 0")
}

level.Info(t.log).Log("msg", "setting count frequency", "freq", cfg.Frequency)
t.cfg = cfg
return nil
}
66 changes: 0 additions & 66 deletions internal/runtime/internal/testcomponents/sumation1.go

This file was deleted.

99 changes: 0 additions & 99 deletions internal/runtime/internal/testcomponents/sumation2.go

This file was deleted.

Loading

0 comments on commit 57e22d1

Please sign in to comment.