diff --git a/internal/component/loki/process/process_test.go b/internal/component/loki/process/process_test.go index 3c18a7f513f7..da9998b49d9b 100644 --- a/internal/component/loki/process/process_test.go +++ b/internal/component/loki/process/process_test.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "strings" + "sync" "testing" "time" @@ -316,6 +317,8 @@ stage.labels { } func TestEntrySentToTwoProcessComponents(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + // Set up two different loki.process components. stg1 := ` forward_to = [] @@ -337,13 +340,16 @@ stage.static_labels { args1.ForwardTo = []loki.LogsReceiver{ch1} args2.ForwardTo = []loki.LogsReceiver{ch2} + ctx, ctxCancel := context.WithCancel(context.Background()) + defer ctxCancel() + // Start the loki.process components. tc1, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.process") require.NoError(t, err) tc2, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.process") require.NoError(t, err) - go func() { require.NoError(t, tc1.Run(componenttest.TestContext(t), args1)) }() - go func() { require.NoError(t, tc2.Run(componenttest.TestContext(t), args2)) }() + go func() { require.NoError(t, tc1.Run(ctx, args1)) }() + go func() { require.NoError(t, tc2.Run(ctx, args2)) }() require.NoError(t, tc1.WaitExports(time.Second)) require.NoError(t, tc2.WaitExports(time.Second)) @@ -357,7 +363,7 @@ stage.static_labels { require.NoError(t, err) go func() { - err := ctrl.Run(context.Background(), lsf.Arguments{ + err := ctrl.Run(ctx, lsf.Arguments{ Targets: []discovery.Target{{"__path__": f.Name(), "somelbl": "somevalue"}}, ForwardTo: []loki.LogsReceiver{ tc1.Exports().(Exports).Receiver, @@ -395,68 +401,102 @@ stage.static_labels { } } -func TestDeadlockWithFrequentUpdates(t *testing.T) { - stg := `stage.json { - expressions = {"output" = "log", stream = "stream", timestamp = "time", "extra" = "" } - drop_malformed = true - } - stage.json { - expressions = { "user" = "" } - source = "extra" - } - stage.labels { - values = { - stream = "", - user = "", - ts = "timestamp", - } - }` +type testFrequentUpdate struct { + t *testing.T + c *Component - // Unmarshal the River relabel rules into a custom struct, as we don't have - // an easy way to refer to a loki.LogsReceiver value for the forward_to - // argument. - type cfg struct { - Stages []stages.StageConfig `river:"stage,enum"` + receiver1 loki.LogsReceiver + receiver2 loki.LogsReceiver + + keepSending atomic.Bool + keepReceiving atomic.Bool + keepUpdating atomic.Bool + + wgLogSend sync.WaitGroup + wgRun sync.WaitGroup + wgUpdate sync.WaitGroup + + lastSend atomic.Value + + stop func() +} + +func startTestFrequentUpdate(t *testing.T, cfg string) *testFrequentUpdate { + res := testFrequentUpdate{ + t: t, + receiver1: loki.NewLogsReceiver(), + receiver2: loki.NewLogsReceiver(), } - var stagesCfg cfg - err := river.Unmarshal([]byte(stg), &stagesCfg) + + ctx, cancel := context.WithCancel(context.Background()) + + res.keepSending.Store(true) + res.keepReceiving.Store(true) + res.keepUpdating.Store(true) + + res.stop = func() { + res.keepUpdating.Store(false) + res.wgUpdate.Wait() + + res.keepSending.Store(false) + res.wgLogSend.Wait() + + cancel() + res.wgRun.Wait() + + close(res.receiver1.Chan()) + close(res.receiver2.Chan()) + } + + var args Arguments + err := river.Unmarshal([]byte(cfg), &args) require.NoError(t, err) - ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver() + args.ForwardTo = []loki.LogsReceiver{res.receiver1, res.receiver2} - // Create and run the component, so that it can process and forwards logs. opts := component.Options{ Logger: util.TestFlowLogger(t), Registerer: prometheus.NewRegistry(), OnStateChange: func(e component.Exports) {}, } - args := Arguments{ - ForwardTo: []loki.LogsReceiver{ch1, ch2}, - Stages: stagesCfg.Stages, - } - c, err := New(opts, args) + res.c, err = New(opts, args) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go c.Run(ctx) - var lastSend atomic.Value - // Drain received logs + res.wgRun.Add(1) + go func() { + res.c.Run(ctx) + res.wgRun.Done() + }() + + return &res +} + +// Continuously receive the logs from both channels +func (r *testFrequentUpdate) drainLogs() { + drainLogs := func() { + r.lastSend.Store(time.Now()) + } + + r.wgLogSend.Add(1) go func() { - for { + for r.keepReceiving.Load() { select { - case <-ch1.Chan(): - lastSend.Store(time.Now()) - case <-ch2.Chan(): - lastSend.Store(time.Now()) + case <-r.receiver1.Chan(): + drainLogs() + case <-r.receiver2.Chan(): + drainLogs() } } + r.wgLogSend.Done() }() +} - // Continuously send entries to both channels +// Continuously send entries to both channels +func (r *testFrequentUpdate) sendLogs() { + r.wgLogSend.Add(1) go func() { - for { + for r.keepSending.Load() { ts := time.Now() logEntry := loki.Entry{ Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"}, @@ -465,28 +505,89 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) { Line: logline, }, } - c.receiver.Chan() <- logEntry + select { + case r.c.receiver.Chan() <- logEntry: + default: + // continue + } } + r.keepReceiving.Store(false) + r.wgLogSend.Done() }() +} - // Call Updates - args1 := Arguments{ - ForwardTo: []loki.LogsReceiver{ch1}, - Stages: stagesCfg.Stages, - } - args2 := Arguments{ - ForwardTo: []loki.LogsReceiver{ch2}, - Stages: stagesCfg.Stages, - } +func (r *testFrequentUpdate) updateContinuously(cfg1, cfg2 string) { + var args1 Arguments + err := river.Unmarshal([]byte(cfg1), &args1) + require.NoError(r.t, err) + args1.ForwardTo = []loki.LogsReceiver{r.receiver1} + + var args2 Arguments + err = river.Unmarshal([]byte(cfg2), &args2) + require.NoError(r.t, err) + args2.ForwardTo = []loki.LogsReceiver{r.receiver2} + + r.wgUpdate.Add(1) go func() { - for { - c.Update(args1) - c.Update(args2) + for r.keepUpdating.Load() { + r.c.Update(args1) + r.c.Update(args2) } + r.wgUpdate.Done() }() +} + +func TestDeadlockWithFrequentUpdates(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + + cfg1 := `stage.json { + expressions = {"output" = "log", stream = "stream", timestamp = "time", "extra" = "" } + drop_malformed = true + } + stage.json { + expressions = { "user" = "" } + source = "extra" + } + stage.labels { + values = { + stream = "", + user = "", + ts = "timestamp", + } + } + forward_to = []` + + cfg2 := `stage.json { + expressions = {"output" = "log", stream = "stream", timestamp = "time", "extra" = "" } + drop_malformed = true + } + stage.labels { + values = { + stream = "", + ts = "timestamp", + } + } + forward_to = []` + + r := startTestFrequentUpdate(t, `forward_to = []`) + + // Continuously send entries to both channels + r.sendLogs() + + // Continuously receive entries on both channels + r.drainLogs() + + // Call Updates + r.updateContinuously(cfg1, cfg2) // Run everything for a while time.Sleep(1 * time.Second) + require.WithinDuration(t, time.Now(), r.lastSend.Load().(time.Time), 300*time.Millisecond) + + // Clean up + r.stop() +} + // Make sure there are no goroutine leaks when the config is updated. // Goroutine leaks often cause memory leaks. func TestLeakyUpdate(t *testing.T) {