Skip to content

Commit

Permalink
Fix bug which prevents config reload when metrics stage is used
Browse files Browse the repository at this point in the history
  • Loading branch information
ptodev committed Jul 8, 2024
1 parent e325e8a commit 4bc33dc
Show file tree
Hide file tree
Showing 10 changed files with 357 additions and 17 deletions.
4 changes: 4 additions & 0 deletions docs/sources/flow/reference/components/loki.process.md
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,10 @@ The following blocks are supported inside the definition of `stage.metrics`:
| metric.gauge | [metric.gauge][] | Defines a `gauge` metric. | no |
| metric.histogram | [metric.histogram][] | Defines a `histogram` metric. | no |

{{< admonition type="note" >}}
If the configuration file for {{< param "PRODUCT_ROOT_NAME" >}} is reloaded, the metrics will be reset.
{{< /admonition >}}

[metric.counter]: #metriccounter-block
[metric.gauge]: #metricgauge-block
[metric.histogram]: #metrichistogram-block
Expand Down
2 changes: 1 addition & 1 deletion internal/component/loki/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (c *Component) Update(args component.Arguments) error {
if err != nil {
return err
}
c.entryHandler = loki.NewEntryHandler(c.processOut, func() {})
c.entryHandler = loki.NewEntryHandler(c.processOut, func() { pipeline.Cleanup() })
c.processIn = pipeline.Wrap(c.entryHandler).Chan()
c.stages = newArgs.Stages
}
Expand Down
208 changes: 206 additions & 2 deletions internal/component/loki/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package process
import (
"context"
"os"
"strings"
"testing"
"time"

Expand All @@ -18,12 +19,15 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/river"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/goleak"
)

const logline = `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}`

func TestJSONLabelsStage(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))

Expand Down Expand Up @@ -91,7 +95,6 @@ func TestJSONLabelsStage(t *testing.T) {

// Send a log entry to the component's receiver.
ts := time.Now()
logline := `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}`
logEntry := loki.Entry{
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"},
Entry: logproto.Entry{
Expand Down Expand Up @@ -454,7 +457,6 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) {
go func() {
for {
ts := time.Now()
logline := `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}`
logEntry := loki.Entry{
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"},
Entry: logproto.Entry{
Expand Down Expand Up @@ -486,3 +488,205 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) {
time.Sleep(1 * time.Second)
require.WithinDuration(t, time.Now(), lastSend.Load().(time.Time), 300*time.Millisecond)
}

func TestMetricsStageRefresh(t *testing.T) {
ch := loki.NewLogsReceiver()
reg := prometheus.NewRegistry()

stg := `
stage.metrics {
metric.counter {
name = "paulin_test"
action = "inc"
match_all = true
}
}
// This will be filled later
forward_to = []`
var stagesCfg Arguments
err := river.Unmarshal([]byte(stg), &stagesCfg)
require.NoError(t, err)

// Create and run the component, so that it can process and forwards logs.
opts := component.Options{
Logger: util.TestFlowLogger(t),
Registerer: reg,
OnStateChange: func(e component.Exports) {},
}
args := Arguments{
ForwardTo: []loki.LogsReceiver{ch},
Stages: stagesCfg.Stages,
}

c, err := New(opts, args)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go c.Run(ctx)

// Send a log entry to the component's receiver.
ts := time.Now()
logEntry := loki.Entry{
Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"},
Entry: logproto.Entry{
Timestamp: ts,
Line: logline,
},
}

c.receiver.Chan() <- logEntry

wantLabelSet := model.LabelSet{
"filename": "/var/log/pods/agent/agent/1.log",
"foo": "bar",
}

for i := 0; i < 1; i++ {
select {
case logEntry := <-ch.Chan():
require.True(t, ts.Equal(logEntry.Timestamp))
require.Equal(t, logline, logEntry.Line)
require.Equal(t, wantLabelSet, logEntry.Labels)
case <-time.After(5 * time.Second):
require.FailNow(t, "failed waiting for log line")
}
}

expectedMetrics := `
# HELP loki_process_custom_paulin_test
# TYPE loki_process_custom_paulin_test counter
loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} 1
`

if err := testutil.GatherAndCompare(reg,
strings.NewReader(expectedMetrics)); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}

args1 := Arguments{
ForwardTo: []loki.LogsReceiver{ch},
Stages: stagesCfg.Stages,
}
c.Update(args1)

// The component was "updated" with the same config.
// We expect the metric to stay the same, because the component should be smart enough to
// know that the new config is the same as the old one and it should just keep running as it is.
// If it resets the metric, this could cause issues with some users who have a sidecar "autoreloader"
// which reloads the collector config every X seconds.
// Those users wouldn't expect their metrics to be reset every time the config is reloaded.
if err := testutil.GatherAndCompare(reg,
strings.NewReader(expectedMetrics)); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}

// Use a config which has no metrics stage.
// This should cause the metric to disappear.
stg2 := `
// This will be filled later
forward_to = []`

var stagesCfg2 Arguments
err = river.Unmarshal([]byte(stg2), &stagesCfg2)
require.NoError(t, err)

args2 := Arguments{
ForwardTo: []loki.LogsReceiver{ch},
Stages: stagesCfg2.Stages,
}

c.Update(args2)

// Make sure there are no metrics - there is no metrics stage in the latest config.
if err := testutil.GatherAndCompare(reg,
strings.NewReader("")); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}

c.receiver.Chan() <- logEntry

for i := 0; i < 1; i++ {
select {
case logEntry := <-ch.Chan():
require.True(t, ts.Equal(logEntry.Timestamp))
require.Equal(t, logline, logEntry.Line)
require.Equal(t, wantLabelSet, logEntry.Labels)
case <-time.After(5 * time.Second):
require.FailNow(t, "failed waiting for log line")
}
}

// Make sure there are no metrics - there is no metrics stage in the latest config.
if err := testutil.GatherAndCompare(reg,
strings.NewReader("")); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}

// Use a config which has a metric with a different name,
// as well as a metric with the same name.
// Only the new metric should be visible.
stg3 := `
stage.metrics {
metric.counter {
name = "paulin_test_3"
action = "inc"
match_all = true
}
metric.counter {
name = "paulin_test"
action = "inc"
match_all = true
}
}
// This will be filled later
forward_to = []`
var stagesCfg3 Arguments
err = river.Unmarshal([]byte(stg3), &stagesCfg3)
require.NoError(t, err)

args3 := Arguments{
ForwardTo: []loki.LogsReceiver{ch},
Stages: stagesCfg3.Stages,
}
c.Update(args3)

// No logs have been sent since the last update.
// Therefore, the metric should not be visible.
if err := testutil.GatherAndCompare(reg,
strings.NewReader("")); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}

// Send 3 log lines.
c.receiver.Chan() <- logEntry
c.receiver.Chan() <- logEntry
c.receiver.Chan() <- logEntry

for i := 0; i < 3; i++ {
select {
case logEntry := <-ch.Chan():
require.True(t, ts.Equal(logEntry.Timestamp))
require.Equal(t, logline, logEntry.Line)
require.Equal(t, wantLabelSet, logEntry.Labels)
case <-time.After(5 * time.Second):
require.FailNow(t, "failed waiting for log line")
}
}

// Expect the metric counter to be "3".
expectedMetrics3 := `
# HELP loki_process_custom_paulin_test_3
# TYPE loki_process_custom_paulin_test_3 counter
loki_process_custom_paulin_test_3{filename="/var/log/pods/agent/agent/1.log",foo="bar"} 3
# HELP loki_process_custom_paulin_test
# TYPE loki_process_custom_paulin_test counter
loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} 3
`

if err := testutil.GatherAndCompare(reg,
strings.NewReader(expectedMetrics3)); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}
}
59 changes: 54 additions & 5 deletions internal/util/unregisterer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package util

import "github.com/prometheus/client_golang/prometheus"
import (
"fmt"

"github.com/hashicorp/go-multierror"
"github.com/prometheus/client_golang/prometheus"
)

// Unregisterer is a Prometheus Registerer that can unregister all collectors
// passed to it.
Expand All @@ -18,6 +23,40 @@ func WrapWithUnregisterer(reg prometheus.Registerer) *Unregisterer {
}
}

func describeCollector(c prometheus.Collector) string {
var (
descChan = make(chan *prometheus.Desc, 10)
)
go func() {
c.Describe(descChan)
close(descChan)
}()

descs := make([]string, 0)
for desc := range descChan {
descs = append(descs, desc.String())
}

return fmt.Sprintf("%v", descs)
}

func isUncheckedCollector(c prometheus.Collector) bool {
var (
descChan = make(chan *prometheus.Desc, 10)
)
go func() {
c.Describe(descChan)
close(descChan)
}()

i := 0
for range descChan {
i += 1
}

return i == 0
}

// Register implements prometheus.Registerer.
func (u *Unregisterer) Register(c prometheus.Collector) error {
if u.wrap == nil {
Expand All @@ -28,6 +67,11 @@ func (u *Unregisterer) Register(c prometheus.Collector) error {
if err != nil {
return err
}

if isUncheckedCollector(c) {
return nil
}

u.cs[c] = struct{}{}
return nil
}
Expand All @@ -43,6 +87,10 @@ func (u *Unregisterer) MustRegister(cs ...prometheus.Collector) {

// Unregister implements prometheus.Registerer.
func (u *Unregisterer) Unregister(c prometheus.Collector) bool {
if isUncheckedCollector(c) {
return true
}

if u.wrap != nil && u.wrap.Unregister(c) {
delete(u.cs, c)
return true
Expand All @@ -52,12 +100,13 @@ func (u *Unregisterer) Unregister(c prometheus.Collector) bool {

// UnregisterAll unregisters all collectors that were registered through the
// Registerer.
func (u *Unregisterer) UnregisterAll() bool {
success := true
func (u *Unregisterer) UnregisterAll() error {
var multiErr error
for c := range u.cs {
if !u.Unregister(c) {
success = false
err := fmt.Errorf("failed to unregister collector %v", describeCollector(c))
multiErr = multierror.Append(multiErr, err)
}
}
return success
return multiErr
}
35 changes: 35 additions & 0 deletions internal/util/unregisterer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package util

import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

func Test_UnregisterTwice(t *testing.T) {
u := WrapWithUnregisterer(prometheus.NewRegistry())
c := prometheus.NewCounter(prometheus.CounterOpts{
Name: "test_metric",
Help: "Test metric.",
})
u.Register(c)
require.True(t, u.Unregister(c))
require.False(t, u.Unregister(c))
}

type uncheckedCollector struct{}

func (uncheckedCollector) Describe(chan<- *prometheus.Desc) {}

func (uncheckedCollector) Collect(chan<- prometheus.Metric) {}

var _ prometheus.Collector = uncheckedCollector{}

func Test_UncheckedCollector(t *testing.T) {
u := WrapWithUnregisterer(prometheus.NewRegistry())
c := uncheckedCollector{}
u.Register(c)
require.True(t, u.Unregister(c))
require.True(t, u.Unregister(c))
}
Loading

0 comments on commit 4bc33dc

Please sign in to comment.