From 543104a28408d02698c5488723ac229fa8d3906c Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Thu, 2 Nov 2023 19:40:51 -0300 Subject: [PATCH 1/6] added last read wrote metrics --- component/common/loki/client/client.go | 1 + component/common/loki/client/manager.go | 3 +- component/common/loki/client/metrics.go | 34 +++++++++++++++++++ component/common/loki/client/queue_client.go | 26 ++++++++++---- .../common/loki/client/queue_client_test.go | 6 ++-- component/common/loki/wal/watcher_metrics.go | 2 ++ component/common/loki/wal/writer.go | 11 ++++++ 7 files changed, 73 insertions(+), 10 deletions(-) create mode 100644 component/common/loki/client/metrics.go diff --git a/component/common/loki/client/client.go b/component/common/loki/client/client.go index 36839dbcffa3..208e7c45900c 100644 --- a/component/common/loki/client/client.go +++ b/component/common/loki/client/client.go @@ -58,6 +58,7 @@ type Metrics struct { mutatedBytes *prometheus.CounterVec requestDuration *prometheus.HistogramVec batchRetries *prometheus.CounterVec + lastReadTimestamp *prometheus.GaugeVec countersWithHost []*prometheus.CounterVec countersWithHostTenant []*prometheus.CounterVec countersWithHostTenantReason []*prometheus.CounterVec diff --git a/component/common/loki/client/manager.go b/component/common/loki/client/manager.go index 290ddcc0f951..6683e9e5772b 100644 --- a/component/common/loki/client/manager.go +++ b/component/common/loki/client/manager.go @@ -70,6 +70,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr walWatcherMetrics := wal.NewWatcherMetrics(reg) walMarkerMetrics := internal.NewMarkerMetrics(reg) + queueClientMetrics := NewQueueClientMetrics(reg) if len(clientCfgs) == 0 { return nil, fmt.Errorf("at least one client config must be provided") @@ -98,7 +99,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr } markerHandler := internal.NewMarkerHandler(markerFileHandler, walCfg.MaxSegmentAge, logger, walMarkerMetrics.WithCurriedId(clientName)) - queue, err := NewQueue(metrics, cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger, markerHandler) + queue, err := NewQueue(metrics, queueClientMetrics.CurryWithId(clientName), cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger, markerHandler) if err != nil { return nil, fmt.Errorf("error starting queue client: %w", err) } diff --git a/component/common/loki/client/metrics.go b/component/common/loki/client/metrics.go new file mode 100644 index 000000000000..9a71cc76090a --- /dev/null +++ b/component/common/loki/client/metrics.go @@ -0,0 +1,34 @@ +package client + +import "github.com/prometheus/client_golang/prometheus" + +type QueueClientMetrics struct { + lastReadTimestamp *prometheus.GaugeVec +} + +func NewQueueClientMetrics(reg prometheus.Registerer) *QueueClientMetrics { + m := &QueueClientMetrics{ + lastReadTimestamp: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "loki_write", + Name: "last_read_timestamp", + Help: "Latest timestamp read from the WAL", + }, + []string{"id"}, + ), + } + + if reg != nil { + reg.MustRegister(m.lastReadTimestamp) + } + + return m +} + +func (m *QueueClientMetrics) CurryWithId(id string) *QueueClientMetrics { + return &QueueClientMetrics{ + lastReadTimestamp: m.lastReadTimestamp.MustCurryWith(map[string]string{ + "id": id, + }), + } +} diff --git a/component/common/loki/client/queue_client.go b/component/common/loki/client/queue_client.go index 458839db60dd..6c66fd48de24 100644 --- a/component/common/loki/client/queue_client.go +++ b/component/common/loki/client/queue_client.go @@ -150,10 +150,11 @@ func (q *queue) closeNow() { // which allows it to be injected in the wal.Watcher as a destination where to write read series and entries. As the watcher // reads from the WAL, batches are created and dispatched onto a send queue when ready to be sent. type queueClient struct { - metrics *Metrics - logger log.Logger - cfg Config - client *http.Client + metrics *Metrics + qcMetrics *QueueClientMetrics + logger log.Logger + cfg Config + client *http.Client batches map[string]*batch batchesMtx sync.Mutex @@ -180,14 +181,14 @@ type queueClient struct { } // NewQueue creates a new queueClient. -func NewQueue(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (StoppableWriteTo, error) { +func NewQueue(metrics *Metrics, queueClientMetrics *QueueClientMetrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (StoppableWriteTo, error) { if cfg.StreamLagLabels.String() != "" { return nil, fmt.Errorf("client config stream_lag_labels is deprecated and the associated metric has been removed, stream_lag_labels: %+v", cfg.StreamLagLabels.String()) } - return newQueueClient(metrics, cfg, maxStreams, maxLineSize, maxLineSizeTruncate, logger, markerHandler) + return newQueueClient(metrics, queueClientMetrics, cfg, maxStreams, maxLineSize, maxLineSizeTruncate, logger, markerHandler) } -func newQueueClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (*queueClient, error) { +func newQueueClient(metrics *Metrics, qcMetrics *QueueClientMetrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (*queueClient, error) { if cfg.URL.URL == nil { return nil, errors.New("client needs target URL") } @@ -198,6 +199,7 @@ func newQueueClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, m logger: log.With(logger, "component", "client", "host", cfg.URL.Host), cfg: cfg, metrics: metrics, + qcMetrics: qcMetrics, drainTimeout: cfg.Queue.DrainTimeout, quit: make(chan struct{}), @@ -283,9 +285,13 @@ func (c *queueClient) AppendEntries(entries wal.RefEntries, segment int) error { c.seriesLock.RLock() l, ok := c.series[entries.Ref] c.seriesLock.RUnlock() + var maxSeenTimestamp int64 = -1 if ok { for _, e := range entries.Entries { c.appendSingleEntry(segment, l, e) + if e.Timestamp.Unix() > maxSeenTimestamp { + maxSeenTimestamp = e.Timestamp.Unix() + } } // count all enqueued appended entries as received from WAL c.markerHandler.UpdateReceivedData(segment, len(entries.Entries)) @@ -293,6 +299,12 @@ func (c *queueClient) AppendEntries(entries wal.RefEntries, segment int) error { // TODO(thepalbi): Add metric here level.Debug(c.logger).Log("msg", "series for entry not found") } + + // emit metric tracking max seen timestamp in WAL entry + if maxSeenTimestamp != -1 { + c.qcMetrics.lastReadTimestamp.WithLabelValues().Set(float64(maxSeenTimestamp)) + } + return nil } diff --git a/component/common/loki/client/queue_client_test.go b/component/common/loki/client/queue_client_test.go index a23804d44634..f45a48dc3243 100644 --- a/component/common/loki/client/queue_client_test.go +++ b/component/common/loki/client/queue_client_test.go @@ -136,7 +136,8 @@ func TestQueueClient(t *testing.T) { logger := log.NewLogfmtLogger(os.Stdout) m := NewMetrics(reg) - qc, err := NewQueue(m, cfg, 0, 0, false, logger, nilMarkerHandler{}) + qcm := NewQueueClientMetrics(reg) + qc, err := NewQueue(m, qcm, cfg, 0, 0, false, logger, nilMarkerHandler{}) require.NoError(t, err) //labels := model.LabelSet{"app": "test"} @@ -282,7 +283,8 @@ func runQueueClientBenchCase(b *testing.B, bc testCase, mhFactory func(t *testin logger := log.NewLogfmtLogger(os.Stdout) m := NewMetrics(reg) - qc, err := NewQueue(m, cfg, 0, 0, false, logger, mhFactory(b)) + qcm := NewQueueClientMetrics(reg) + qc, err := NewQueue(m, qcm, cfg, 0, 0, false, logger, mhFactory(b)) require.NoError(b, err) //labels := model.LabelSet{"app": "test"} diff --git a/component/common/loki/wal/watcher_metrics.go b/component/common/loki/wal/watcher_metrics.go index 4064f8b22aac..ecdd54a60a83 100644 --- a/component/common/loki/wal/watcher_metrics.go +++ b/component/common/loki/wal/watcher_metrics.go @@ -10,6 +10,7 @@ type WatcherMetrics struct { currentSegment *prometheus.GaugeVec replaySegment *prometheus.GaugeVec watchersRunning *prometheus.GaugeVec + lastReadTimestamp *prometheus.GaugeVec } func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { @@ -86,6 +87,7 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { m.segmentRead = mustRegisterOrGet(reg, m.segmentRead).(*prometheus.CounterVec) m.currentSegment = mustRegisterOrGet(reg, m.currentSegment).(*prometheus.GaugeVec) m.watchersRunning = mustRegisterOrGet(reg, m.watchersRunning).(*prometheus.GaugeVec) + m.lastReadTimestamp = mustRegisterOrGet(reg, m.lastReadTimestamp).(*prometheus.GaugeVec) } return m diff --git a/component/common/loki/wal/writer.go b/component/common/loki/wal/writer.go index 929199529c5d..e71773d944d6 100644 --- a/component/common/loki/wal/writer.go +++ b/component/common/loki/wal/writer.go @@ -59,6 +59,7 @@ type Writer struct { reclaimedOldSegmentsSpaceCounter *prometheus.CounterVec lastReclaimedSegment *prometheus.GaugeVec + lastWrittenTimestamp *prometheus.GaugeVec closeCleaner chan struct{} } @@ -96,10 +97,17 @@ func NewWriter(walCfg Config, logger log.Logger, reg prometheus.Registerer) (*Wr Name: "last_reclaimed_segment", Help: "Last reclaimed segment number", }, []string{}) + wrt.lastWrittenTimestamp = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "loki_write", + Subsystem: "wal_writer", + Name: "last_written_timestamp", + Help: "Latest timestamp that was written to the WAL", + }, []string{}) if reg != nil { _ = reg.Register(wrt.reclaimedOldSegmentsSpaceCounter) _ = reg.Register(wrt.lastReclaimedSegment) + _ = reg.Register(wrt.lastWrittenTimestamp) } wrt.start(walCfg.MaxSegmentAge) @@ -118,6 +126,9 @@ func (wrt *Writer) start(maxSegmentAge time.Duration) { continue } + // emit metric with latest written timestamp, to be able to track delay from writer to watcher + wrt.lastWrittenTimestamp.WithLabelValues().Set(float64(e.Timestamp.Unix())) + wrt.writeSubscribersLock.RLock() for _, s := range wrt.writeSubscribers { s.NotifyWrite() From d12d8124baf0ed42a6ac98e1ea5082ec5c4e4f30 Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Fri, 3 Nov 2023 13:21:38 -0300 Subject: [PATCH 2/6] post-rebase fix --- component/common/loki/client/client.go | 29 +++++++------------- component/common/loki/client/metrics.go | 7 +++-- component/common/loki/wal/watcher_metrics.go | 29 +++++++------------- pkg/util/metrics.go | 16 +++++++++++ 4 files changed, 41 insertions(+), 40 deletions(-) create mode 100644 pkg/util/metrics.go diff --git a/component/common/loki/client/client.go b/component/common/loki/client/client.go index 208e7c45900c..b2469b93b885 100644 --- a/component/common/loki/client/client.go +++ b/component/common/loki/client/client.go @@ -6,6 +6,7 @@ import ( "context" "errors" "fmt" + "github.com/grafana/agent/pkg/util" "io" "net/http" "strconv" @@ -117,30 +118,20 @@ func NewMetrics(reg prometheus.Registerer) *Metrics { } if reg != nil { - m.encodedBytes = mustRegisterOrGet(reg, m.encodedBytes).(*prometheus.CounterVec) - m.sentBytes = mustRegisterOrGet(reg, m.sentBytes).(*prometheus.CounterVec) - m.droppedBytes = mustRegisterOrGet(reg, m.droppedBytes).(*prometheus.CounterVec) - m.sentEntries = mustRegisterOrGet(reg, m.sentEntries).(*prometheus.CounterVec) - m.droppedEntries = mustRegisterOrGet(reg, m.droppedEntries).(*prometheus.CounterVec) - m.mutatedEntries = mustRegisterOrGet(reg, m.mutatedEntries).(*prometheus.CounterVec) - m.mutatedBytes = mustRegisterOrGet(reg, m.mutatedBytes).(*prometheus.CounterVec) - m.requestDuration = mustRegisterOrGet(reg, m.requestDuration).(*prometheus.HistogramVec) - m.batchRetries = mustRegisterOrGet(reg, m.batchRetries).(*prometheus.CounterVec) + m.encodedBytes = util.MustRegisterOrGet(reg, m.encodedBytes).(*prometheus.CounterVec) + m.sentBytes = util.MustRegisterOrGet(reg, m.sentBytes).(*prometheus.CounterVec) + m.droppedBytes = util.MustRegisterOrGet(reg, m.droppedBytes).(*prometheus.CounterVec) + m.sentEntries = util.MustRegisterOrGet(reg, m.sentEntries).(*prometheus.CounterVec) + m.droppedEntries = util.MustRegisterOrGet(reg, m.droppedEntries).(*prometheus.CounterVec) + m.mutatedEntries = util.MustRegisterOrGet(reg, m.mutatedEntries).(*prometheus.CounterVec) + m.mutatedBytes = util.MustRegisterOrGet(reg, m.mutatedBytes).(*prometheus.CounterVec) + m.requestDuration = util.MustRegisterOrGet(reg, m.requestDuration).(*prometheus.HistogramVec) + m.batchRetries = util.MustRegisterOrGet(reg, m.batchRetries).(*prometheus.CounterVec) } return &m } -func mustRegisterOrGet(reg prometheus.Registerer, c prometheus.Collector) prometheus.Collector { - if err := reg.Register(c); err != nil { - if are, ok := err.(prometheus.AlreadyRegisteredError); ok { - return are.ExistingCollector - } - panic(err) - } - return c -} - // Client pushes entries to Loki and can be stopped type Client interface { loki.EntryHandler diff --git a/component/common/loki/client/metrics.go b/component/common/loki/client/metrics.go index 9a71cc76090a..6d32bf1ce459 100644 --- a/component/common/loki/client/metrics.go +++ b/component/common/loki/client/metrics.go @@ -1,6 +1,9 @@ package client -import "github.com/prometheus/client_golang/prometheus" +import ( + "github.com/grafana/agent/pkg/util" + "github.com/prometheus/client_golang/prometheus" +) type QueueClientMetrics struct { lastReadTimestamp *prometheus.GaugeVec @@ -19,7 +22,7 @@ func NewQueueClientMetrics(reg prometheus.Registerer) *QueueClientMetrics { } if reg != nil { - reg.MustRegister(m.lastReadTimestamp) + m.lastReadTimestamp = util.MustRegisterOrGet(reg, m.lastReadTimestamp).(*prometheus.GaugeVec) } return m diff --git a/component/common/loki/wal/watcher_metrics.go b/component/common/loki/wal/watcher_metrics.go index ecdd54a60a83..ce8052fd442d 100644 --- a/component/common/loki/wal/watcher_metrics.go +++ b/component/common/loki/wal/watcher_metrics.go @@ -1,6 +1,9 @@ package wal -import "github.com/prometheus/client_golang/prometheus" +import ( + "github.com/grafana/agent/pkg/util" + "github.com/prometheus/client_golang/prometheus" +) type WatcherMetrics struct { recordsRead *prometheus.CounterVec @@ -10,7 +13,6 @@ type WatcherMetrics struct { currentSegment *prometheus.GaugeVec replaySegment *prometheus.GaugeVec watchersRunning *prometheus.GaugeVec - lastReadTimestamp *prometheus.GaugeVec } func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { @@ -81,24 +83,13 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { } if reg != nil { - m.recordsRead = mustRegisterOrGet(reg, m.recordsRead).(*prometheus.CounterVec) - m.recordDecodeFails = mustRegisterOrGet(reg, m.recordDecodeFails).(*prometheus.CounterVec) - m.droppedWriteNotifications = mustRegisterOrGet(reg, m.droppedWriteNotifications).(*prometheus.CounterVec) - m.segmentRead = mustRegisterOrGet(reg, m.segmentRead).(*prometheus.CounterVec) - m.currentSegment = mustRegisterOrGet(reg, m.currentSegment).(*prometheus.GaugeVec) - m.watchersRunning = mustRegisterOrGet(reg, m.watchersRunning).(*prometheus.GaugeVec) - m.lastReadTimestamp = mustRegisterOrGet(reg, m.lastReadTimestamp).(*prometheus.GaugeVec) + m.recordsRead = util.MustRegisterOrGet(reg, m.recordsRead).(*prometheus.CounterVec) + m.recordDecodeFails = util.MustRegisterOrGet(reg, m.recordDecodeFails).(*prometheus.CounterVec) + m.droppedWriteNotifications = util.MustRegisterOrGet(reg, m.droppedWriteNotifications).(*prometheus.CounterVec) + m.segmentRead = util.MustRegisterOrGet(reg, m.segmentRead).(*prometheus.CounterVec) + m.currentSegment = util.MustRegisterOrGet(reg, m.currentSegment).(*prometheus.GaugeVec) + m.watchersRunning = util.MustRegisterOrGet(reg, m.watchersRunning).(*prometheus.GaugeVec) } return m } - -func mustRegisterOrGet(reg prometheus.Registerer, c prometheus.Collector) prometheus.Collector { - if err := reg.Register(c); err != nil { - if are, ok := err.(prometheus.AlreadyRegisteredError); ok { - return are.ExistingCollector - } - panic(err) - } - return c -} diff --git a/pkg/util/metrics.go b/pkg/util/metrics.go new file mode 100644 index 000000000000..850b535fe8fb --- /dev/null +++ b/pkg/util/metrics.go @@ -0,0 +1,16 @@ +package util + +import "github.com/prometheus/client_golang/prometheus" + +// MustRegisterOrGet will attempt to register the supplied collector into the register. If it's already +// registered, it will return that one. +// In case that the register procedure fails with something other than already registered, this will panic. +func MustRegisterOrGet(reg prometheus.Registerer, c prometheus.Collector) prometheus.Collector { + if err := reg.Register(c); err != nil { + if are, ok := err.(prometheus.AlreadyRegisteredError); ok { + return are.ExistingCollector + } + panic(err) + } + return c +} From cb8ccfb4027a9ff1891f3e4d690f10e1d00b9fc6 Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Fri, 3 Nov 2023 13:23:18 -0300 Subject: [PATCH 3/6] linter fix --- component/common/loki/client/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component/common/loki/client/client.go b/component/common/loki/client/client.go index b2469b93b885..556430a7d097 100644 --- a/component/common/loki/client/client.go +++ b/component/common/loki/client/client.go @@ -6,7 +6,6 @@ import ( "context" "errors" "fmt" - "github.com/grafana/agent/pkg/util" "io" "net/http" "strconv" @@ -22,6 +21,7 @@ import ( "github.com/grafana/agent/component/common/loki" "github.com/grafana/agent/pkg/build" + "github.com/grafana/agent/pkg/util" lokiutil "github.com/grafana/loki/pkg/util" ) From 3453136e1f3019bae813ed682d947d131f67a3c1 Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Mon, 6 Nov 2023 11:04:46 -0300 Subject: [PATCH 4/6] fix linter --- component/common/loki/client/client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/component/common/loki/client/client.go b/component/common/loki/client/client.go index 556430a7d097..964b6c98513b 100644 --- a/component/common/loki/client/client.go +++ b/component/common/loki/client/client.go @@ -59,7 +59,6 @@ type Metrics struct { mutatedBytes *prometheus.CounterVec requestDuration *prometheus.HistogramVec batchRetries *prometheus.CounterVec - lastReadTimestamp *prometheus.GaugeVec countersWithHost []*prometheus.CounterVec countersWithHostTenant []*prometheus.CounterVec countersWithHostTenantReason []*prometheus.CounterVec From 24e6b379205c5baa957a759a7069afc5045c40cc Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Mon, 6 Nov 2023 11:06:48 -0300 Subject: [PATCH 5/6] fix tests --- component/common/loki/client/queue_client_test.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/component/common/loki/client/queue_client_test.go b/component/common/loki/client/queue_client_test.go index f45a48dc3243..cf59f49e1b7a 100644 --- a/component/common/loki/client/queue_client_test.go +++ b/component/common/loki/client/queue_client_test.go @@ -135,9 +135,7 @@ func TestQueueClient(t *testing.T) { logger := log.NewLogfmtLogger(os.Stdout) - m := NewMetrics(reg) - qcm := NewQueueClientMetrics(reg) - qc, err := NewQueue(m, qcm, cfg, 0, 0, false, logger, nilMarkerHandler{}) + qc, err := NewQueue(NewMetrics(reg), NewQueueClientMetrics(reg).CurryWithId("test"), cfg, 0, 0, false, logger, nilMarkerHandler{}) require.NoError(t, err) //labels := model.LabelSet{"app": "test"} @@ -282,9 +280,7 @@ func runQueueClientBenchCase(b *testing.B, bc testCase, mhFactory func(t *testin logger := log.NewLogfmtLogger(os.Stdout) - m := NewMetrics(reg) - qcm := NewQueueClientMetrics(reg) - qc, err := NewQueue(m, qcm, cfg, 0, 0, false, logger, mhFactory(b)) + qc, err := NewQueue(NewMetrics(reg), NewQueueClientMetrics(reg).CurryWithId("test"), cfg, 0, 0, false, logger, mhFactory(b)) require.NoError(b, err) //labels := model.LabelSet{"app": "test"} From 83c76f97cae7d7d4ae0b99beabe1ef988fb631b8 Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Tue, 7 Nov 2023 10:15:18 -0300 Subject: [PATCH 6/6] remove unnecessary if --- component/common/loki/client/queue_client.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/component/common/loki/client/queue_client.go b/component/common/loki/client/queue_client.go index 6c66fd48de24..dc8ed469fba3 100644 --- a/component/common/loki/client/queue_client.go +++ b/component/common/loki/client/queue_client.go @@ -300,10 +300,9 @@ func (c *queueClient) AppendEntries(entries wal.RefEntries, segment int) error { level.Debug(c.logger).Log("msg", "series for entry not found") } - // emit metric tracking max seen timestamp in WAL entry - if maxSeenTimestamp != -1 { - c.qcMetrics.lastReadTimestamp.WithLabelValues().Set(float64(maxSeenTimestamp)) - } + // It's safe to assume that upon an AppendEntries call, there will always be at least + // one entry. + c.qcMetrics.lastReadTimestamp.WithLabelValues().Set(float64(maxSeenTimestamp)) return nil }