diff --git a/docs/sources/flow/reference/components/loki.process.md b/docs/sources/flow/reference/components/loki.process.md index 05eb467d63c2..7ab8b2ec45f4 100644 --- a/docs/sources/flow/reference/components/loki.process.md +++ b/docs/sources/flow/reference/components/loki.process.md @@ -717,6 +717,10 @@ The following blocks are supported inside the definition of `stage.metrics`: | metric.gauge | [metric.gauge][] | Defines a `gauge` metric. | no | | metric.histogram | [metric.histogram][] | Defines a `histogram` metric. | no | +{{< admonition type="note" >}} +If the configuration file for {{< param "PRODUCT_ROOT_NAME" >}} is reloaded, the metrics will be reset. +{{< /admonition >}} + [metric.counter]: #metriccounter-block [metric.gauge]: #metricgauge-block [metric.histogram]: #metrichistogram-block diff --git a/internal/component/loki/process/process.go b/internal/component/loki/process/process.go index b084582a951d..b7d0e8eb1b47 100644 --- a/internal/component/loki/process/process.go +++ b/internal/component/loki/process/process.go @@ -127,7 +127,7 @@ func (c *Component) Update(args component.Arguments) error { if err != nil { return err } - c.entryHandler = loki.NewEntryHandler(c.processOut, func() {}) + c.entryHandler = loki.NewEntryHandler(c.processOut, func() { pipeline.Cleanup() }) c.processIn = pipeline.Wrap(c.entryHandler).Chan() c.stages = newArgs.Stages } diff --git a/internal/component/loki/process/process_test.go b/internal/component/loki/process/process_test.go index 8a423c02af73..644c0b9363ec 100644 --- a/internal/component/loki/process/process_test.go +++ b/internal/component/loki/process/process_test.go @@ -5,6 +5,7 @@ package process import ( "context" "os" + "strings" "testing" "time" @@ -18,12 +19,15 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/river" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/goleak" ) +const logline = `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}` + func TestJSONLabelsStage(t *testing.T) { defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) @@ -91,7 +95,6 @@ func TestJSONLabelsStage(t *testing.T) { // Send a log entry to the component's receiver. ts := time.Now() - logline := `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}` logEntry := loki.Entry{ Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"}, Entry: logproto.Entry{ @@ -454,7 +457,6 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) { go func() { for { ts := time.Now() - logline := `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"smith\"}"}` logEntry := loki.Entry{ Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"}, Entry: logproto.Entry{ @@ -486,3 +488,205 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) { time.Sleep(1 * time.Second) require.WithinDuration(t, time.Now(), lastSend.Load().(time.Time), 300*time.Millisecond) } + +func TestMetricsStageRefresh(t *testing.T) { + ch := loki.NewLogsReceiver() + reg := prometheus.NewRegistry() + + stg := ` + stage.metrics { + metric.counter { + name = "paulin_test" + action = "inc" + match_all = true + } + } + + // This will be filled later + forward_to = []` + var stagesCfg Arguments + err := river.Unmarshal([]byte(stg), &stagesCfg) + require.NoError(t, err) + + // Create and run the component, so that it can process and forwards logs. + opts := component.Options{ + Logger: util.TestFlowLogger(t), + Registerer: reg, + OnStateChange: func(e component.Exports) {}, + } + args := Arguments{ + ForwardTo: []loki.LogsReceiver{ch}, + Stages: stagesCfg.Stages, + } + + c, err := New(opts, args) + require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go c.Run(ctx) + + // Send a log entry to the component's receiver. + ts := time.Now() + logEntry := loki.Entry{ + Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"}, + Entry: logproto.Entry{ + Timestamp: ts, + Line: logline, + }, + } + + c.receiver.Chan() <- logEntry + + wantLabelSet := model.LabelSet{ + "filename": "/var/log/pods/agent/agent/1.log", + "foo": "bar", + } + + for i := 0; i < 1; i++ { + select { + case logEntry := <-ch.Chan(): + require.True(t, ts.Equal(logEntry.Timestamp)) + require.Equal(t, logline, logEntry.Line) + require.Equal(t, wantLabelSet, logEntry.Labels) + case <-time.After(5 * time.Second): + require.FailNow(t, "failed waiting for log line") + } + } + + expectedMetrics := ` +# HELP loki_process_custom_paulin_test +# TYPE loki_process_custom_paulin_test counter +loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} 1 +` + + if err := testutil.GatherAndCompare(reg, + strings.NewReader(expectedMetrics)); err != nil { + t.Fatalf("mismatch metrics: %v", err) + } + + args1 := Arguments{ + ForwardTo: []loki.LogsReceiver{ch}, + Stages: stagesCfg.Stages, + } + c.Update(args1) + + // The component was "updated" with the same config. + // We expect the metric to stay the same, because the component should be smart enough to + // know that the new config is the same as the old one and it should just keep running as it is. + // If it resets the metric, this could cause issues with some users who have a sidecar "autoreloader" + // which reloads the collector config every X seconds. + // Those users wouldn't expect their metrics to be reset every time the config is reloaded. + if err := testutil.GatherAndCompare(reg, + strings.NewReader(expectedMetrics)); err != nil { + t.Fatalf("mismatch metrics: %v", err) + } + + // Use a config which has no metrics stage. + // This should cause the metric to disappear. + stg2 := ` + // This will be filled later + forward_to = []` + + var stagesCfg2 Arguments + err = river.Unmarshal([]byte(stg2), &stagesCfg2) + require.NoError(t, err) + + args2 := Arguments{ + ForwardTo: []loki.LogsReceiver{ch}, + Stages: stagesCfg2.Stages, + } + + c.Update(args2) + + // Make sure there are no metrics - there is no metrics stage in the latest config. + if err := testutil.GatherAndCompare(reg, + strings.NewReader("")); err != nil { + t.Fatalf("mismatch metrics: %v", err) + } + + c.receiver.Chan() <- logEntry + + for i := 0; i < 1; i++ { + select { + case logEntry := <-ch.Chan(): + require.True(t, ts.Equal(logEntry.Timestamp)) + require.Equal(t, logline, logEntry.Line) + require.Equal(t, wantLabelSet, logEntry.Labels) + case <-time.After(5 * time.Second): + require.FailNow(t, "failed waiting for log line") + } + } + + // Make sure there are no metrics - there is no metrics stage in the latest config. + if err := testutil.GatherAndCompare(reg, + strings.NewReader("")); err != nil { + t.Fatalf("mismatch metrics: %v", err) + } + + // Use a config which has a metric with a different name, + // as well as a metric with the same name. + // Only the new metric should be visible. + stg3 := ` + stage.metrics { + metric.counter { + name = "paulin_test_3" + action = "inc" + match_all = true + } + metric.counter { + name = "paulin_test" + action = "inc" + match_all = true + } + } + + // This will be filled later + forward_to = []` + var stagesCfg3 Arguments + err = river.Unmarshal([]byte(stg3), &stagesCfg3) + require.NoError(t, err) + + args3 := Arguments{ + ForwardTo: []loki.LogsReceiver{ch}, + Stages: stagesCfg3.Stages, + } + c.Update(args3) + + // No logs have been sent since the last update. + // Therefore, the metric should not be visible. + if err := testutil.GatherAndCompare(reg, + strings.NewReader("")); err != nil { + t.Fatalf("mismatch metrics: %v", err) + } + + // Send 3 log lines. + c.receiver.Chan() <- logEntry + c.receiver.Chan() <- logEntry + c.receiver.Chan() <- logEntry + + for i := 0; i < 3; i++ { + select { + case logEntry := <-ch.Chan(): + require.True(t, ts.Equal(logEntry.Timestamp)) + require.Equal(t, logline, logEntry.Line) + require.Equal(t, wantLabelSet, logEntry.Labels) + case <-time.After(5 * time.Second): + require.FailNow(t, "failed waiting for log line") + } + } + + // Expect the metric counter to be "3". + expectedMetrics3 := ` +# HELP loki_process_custom_paulin_test_3 +# TYPE loki_process_custom_paulin_test_3 counter +loki_process_custom_paulin_test_3{filename="/var/log/pods/agent/agent/1.log",foo="bar"} 3 +# HELP loki_process_custom_paulin_test +# TYPE loki_process_custom_paulin_test counter +loki_process_custom_paulin_test{filename="/var/log/pods/agent/agent/1.log",foo="bar"} 3 +` + + if err := testutil.GatherAndCompare(reg, + strings.NewReader(expectedMetrics3)); err != nil { + t.Fatalf("mismatch metrics: %v", err) + } +} diff --git a/internal/util/unregisterer.go b/internal/util/unregisterer.go index 822132b01785..792ddfc6e617 100644 --- a/internal/util/unregisterer.go +++ b/internal/util/unregisterer.go @@ -1,6 +1,11 @@ package util -import "github.com/prometheus/client_golang/prometheus" +import ( + "fmt" + + "github.com/hashicorp/go-multierror" + "github.com/prometheus/client_golang/prometheus" +) // Unregisterer is a Prometheus Registerer that can unregister all collectors // passed to it. @@ -18,6 +23,40 @@ func WrapWithUnregisterer(reg prometheus.Registerer) *Unregisterer { } } +func describeCollector(c prometheus.Collector) string { + var ( + descChan = make(chan *prometheus.Desc, 10) + ) + go func() { + c.Describe(descChan) + close(descChan) + }() + + descs := make([]string, 0) + for desc := range descChan { + descs = append(descs, desc.String()) + } + + return fmt.Sprintf("%v", descs) +} + +func isUncheckedCollector(c prometheus.Collector) bool { + var ( + descChan = make(chan *prometheus.Desc, 10) + ) + go func() { + c.Describe(descChan) + close(descChan) + }() + + i := 0 + for range descChan { + i += 1 + } + + return i == 0 +} + // Register implements prometheus.Registerer. func (u *Unregisterer) Register(c prometheus.Collector) error { if u.wrap == nil { @@ -28,6 +67,11 @@ func (u *Unregisterer) Register(c prometheus.Collector) error { if err != nil { return err } + + if isUncheckedCollector(c) { + return nil + } + u.cs[c] = struct{}{} return nil } @@ -43,6 +87,10 @@ func (u *Unregisterer) MustRegister(cs ...prometheus.Collector) { // Unregister implements prometheus.Registerer. func (u *Unregisterer) Unregister(c prometheus.Collector) bool { + if isUncheckedCollector(c) { + return true + } + if u.wrap != nil && u.wrap.Unregister(c) { delete(u.cs, c) return true @@ -52,12 +100,13 @@ func (u *Unregisterer) Unregister(c prometheus.Collector) bool { // UnregisterAll unregisters all collectors that were registered through the // Registerer. -func (u *Unregisterer) UnregisterAll() bool { - success := true +func (u *Unregisterer) UnregisterAll() error { + var multiErr error for c := range u.cs { if !u.Unregister(c) { - success = false + err := fmt.Errorf("failed to unregister collector %v", describeCollector(c)) + multiErr = multierror.Append(multiErr, err) } } - return success + return multiErr } diff --git a/internal/util/unregisterer_test.go b/internal/util/unregisterer_test.go new file mode 100644 index 000000000000..70fc606142bd --- /dev/null +++ b/internal/util/unregisterer_test.go @@ -0,0 +1,35 @@ +package util + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +func Test_UnregisterTwice(t *testing.T) { + u := WrapWithUnregisterer(prometheus.NewRegistry()) + c := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "test_metric", + Help: "Test metric.", + }) + u.Register(c) + require.True(t, u.Unregister(c)) + require.False(t, u.Unregister(c)) +} + +type uncheckedCollector struct{} + +func (uncheckedCollector) Describe(chan<- *prometheus.Desc) {} + +func (uncheckedCollector) Collect(chan<- prometheus.Metric) {} + +var _ prometheus.Collector = uncheckedCollector{} + +func Test_UncheckedCollector(t *testing.T) { + u := WrapWithUnregisterer(prometheus.NewRegistry()) + c := uncheckedCollector{} + u.Register(c) + require.True(t, u.Unregister(c)) + require.True(t, u.Unregister(c)) +} diff --git a/static/logs/logs.go b/static/logs/logs.go index 2d6c478fe510..d5af619a4b1f 100644 --- a/static/logs/logs.go +++ b/static/logs/logs.go @@ -173,10 +173,10 @@ func (i *Instance) ApplyConfig(c *InstanceConfig, g GlobalConfig, dryRun bool) e } // Unregister all existing metrics before trying to create a new instance. - if !i.reg.UnregisterAll() { + if err := i.reg.UnregisterAll(); err != nil { // If UnregisterAll fails, we need to abort, otherwise the new promtail // would try to re-register an existing metric and might panic. - return fmt.Errorf("failed to unregister all metrics from previous promtail. THIS IS A BUG") + return fmt.Errorf("failed to unregister all metrics from previous promtail: %w", err) } if len(c.ClientConfigs) == 0 { @@ -244,5 +244,5 @@ func (i *Instance) Stop() { i.promtail.Shutdown() i.promtail = nil } - i.reg.UnregisterAll() + _ = i.reg.UnregisterAll() } diff --git a/static/logs/logs_test.go b/static/logs/logs_test.go index 255c99b55f59..48b460ed679e 100644 --- a/static/logs/logs_test.go +++ b/static/logs/logs_test.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/agent/internal/util" "github.com/grafana/loki/pkg/logproto" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" ) @@ -31,6 +32,8 @@ func TestLogs_NilConfig(t *testing.T) { } func TestLogs(t *testing.T) { + reg := prometheus.NewRegistry() + // // Create a temporary file to tail // @@ -80,7 +83,17 @@ configs: labels: job: test __path__: %s - `, positionsDir, lis.Addr().String(), tmpFile.Name())) + pipeline_stages: + - metrics: + log_lines_total: + type: Counter + description: "total number of log lines" + prefix: my_promtail_custom_ + max_idle_duration: 24h + config: + match_all: true + action: inc +`, positionsDir, lis.Addr().String(), tmpFile.Name())) var cfg Config dec := yaml.NewDecoder(strings.NewReader(cfgText)) @@ -88,7 +101,7 @@ configs: require.NoError(t, dec.Decode(&cfg)) require.NoError(t, cfg.ApplyDefaults()) logger := log.NewSyncLogger(log.NewNopLogger()) - l, err := New(prometheus.NewRegistry(), &cfg, logger, false) + l, err := New(reg, &cfg, logger, false) require.NoError(t, err) defer l.Stop() @@ -103,6 +116,20 @@ configs: require.Equal(t, "Hello, world!", req.Streams[0].Entries[0].Line) } + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` +# HELP my_promtail_custom_log_lines_total total number of log lines +# TYPE my_promtail_custom_log_lines_total counter +my_promtail_custom_log_lines_total{filename="`+tmpFile.Name()+`",job="test",logs_config="default"} 1 +`), "my_promtail_custom_log_lines_total")) + + // + // Apply the same config again, to make sure nothing happened. + // + require.NoError(t, l.ApplyConfig(&cfg, false)) + + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(``), + "my_promtail_custom_log_lines_total")) + // // Apply a new config and write a new line. // @@ -121,7 +148,17 @@ configs: labels: job: test-2 __path__: %s - `, positionsDir, lis.Addr().String(), tmpFile.Name())) + pipeline_stages: + - metrics: + log_lines_total2: + type: Counter + description: "total number of log lines" + prefix: my_promtail_custom2_ + max_idle_duration: 24h + config: + match_all: true + action: inc +`, positionsDir, lis.Addr().String(), tmpFile.Name())) var newCfg Config dec = yaml.NewDecoder(strings.NewReader(cfgText)) @@ -130,6 +167,9 @@ configs: require.NoError(t, newCfg.ApplyDefaults()) require.NoError(t, l.ApplyConfig(&newCfg, false)) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(``), + "my_promtail_custom_log_lines_total", "my_promtail_custom2_log_lines_total2")) + fmt.Fprintf(tmpFile, "Hello again!\n") select { case <-time.After(time.Second * 30): @@ -138,6 +178,12 @@ configs: require.Equal(t, "Hello again!", req.Streams[0].Entries[0].Line) } + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP my_promtail_custom2_log_lines_total2 total number of log lines + # TYPE my_promtail_custom2_log_lines_total2 counter + my_promtail_custom2_log_lines_total2{filename="`+tmpFile.Name()+`",job="test-2",logs_config="default"} 1 + `), "my_promtail_custom_log_lines_total", "my_promtail_custom2_log_lines_total2")) + t.Run("update to nil", func(t *testing.T) { // Applying a nil config should remove all instances. err := l.ApplyConfig(nil, false) diff --git a/static/metrics/cluster/node.go b/static/metrics/cluster/node.go index 0b1e6fa55a68..b8e8c3e27827 100644 --- a/static/metrics/cluster/node.go +++ b/static/metrics/cluster/node.go @@ -85,7 +85,7 @@ func (n *node) ApplyConfig(cfg Config) error { level.Info(n.log).Log("msg", "applying config") // Shut down old components before re-creating the updated ones. - n.reg.UnregisterAll() + _ = n.reg.UnregisterAll() if n.lc != nil { // Note that this will call performClusterReshard and will block until it diff --git a/static/metrics/instance/configstore/remote.go b/static/metrics/instance/configstore/remote.go index 7307bd3ccbbe..7a303b4552be 100644 --- a/static/metrics/instance/configstore/remote.go +++ b/static/metrics/instance/configstore/remote.go @@ -96,7 +96,7 @@ func (r *Remote) ApplyConfig(cfg kv.Config, enable bool) error { } // Unregister all metrics that the previous kv may have registered. - r.reg.UnregisterAll() + _ = r.reg.UnregisterAll() if !enable { r.setClient(nil, nil, kv.Config{}) diff --git a/static/metrics/instance/instance.go b/static/metrics/instance/instance.go index 1e264ca76b8c..79f3482ae615 100644 --- a/static/metrics/instance/instance.go +++ b/static/metrics/instance/instance.go @@ -304,7 +304,9 @@ func (i *Instance) Run(ctx context.Context) error { // exits, any metrics Prometheus registers are removed and can be // re-registered if Run is called again. trackingReg := util.WrapWithUnregisterer(i.reg) - defer trackingReg.UnregisterAll() + defer func() { + _ = trackingReg.UnregisterAll() + }() if err := i.initialize(ctx, trackingReg, &cfg); err != nil { level.Error(i.logger).Log("msg", "failed to initialize instance", "err", err)