Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added last read/wrote timestamp metrics to loki.write #5699

Merged
merged 6 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 10 additions & 19 deletions component/common/loki/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/grafana/agent/component/common/loki"
"github.com/grafana/agent/pkg/build"
"github.com/grafana/agent/pkg/util"
lokiutil "github.com/grafana/loki/pkg/util"
)

Expand Down Expand Up @@ -116,30 +117,20 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
}

if reg != nil {
m.encodedBytes = mustRegisterOrGet(reg, m.encodedBytes).(*prometheus.CounterVec)
m.sentBytes = mustRegisterOrGet(reg, m.sentBytes).(*prometheus.CounterVec)
m.droppedBytes = mustRegisterOrGet(reg, m.droppedBytes).(*prometheus.CounterVec)
m.sentEntries = mustRegisterOrGet(reg, m.sentEntries).(*prometheus.CounterVec)
m.droppedEntries = mustRegisterOrGet(reg, m.droppedEntries).(*prometheus.CounterVec)
m.mutatedEntries = mustRegisterOrGet(reg, m.mutatedEntries).(*prometheus.CounterVec)
m.mutatedBytes = mustRegisterOrGet(reg, m.mutatedBytes).(*prometheus.CounterVec)
m.requestDuration = mustRegisterOrGet(reg, m.requestDuration).(*prometheus.HistogramVec)
m.batchRetries = mustRegisterOrGet(reg, m.batchRetries).(*prometheus.CounterVec)
m.encodedBytes = util.MustRegisterOrGet(reg, m.encodedBytes).(*prometheus.CounterVec)
m.sentBytes = util.MustRegisterOrGet(reg, m.sentBytes).(*prometheus.CounterVec)
m.droppedBytes = util.MustRegisterOrGet(reg, m.droppedBytes).(*prometheus.CounterVec)
m.sentEntries = util.MustRegisterOrGet(reg, m.sentEntries).(*prometheus.CounterVec)
m.droppedEntries = util.MustRegisterOrGet(reg, m.droppedEntries).(*prometheus.CounterVec)
m.mutatedEntries = util.MustRegisterOrGet(reg, m.mutatedEntries).(*prometheus.CounterVec)
m.mutatedBytes = util.MustRegisterOrGet(reg, m.mutatedBytes).(*prometheus.CounterVec)
m.requestDuration = util.MustRegisterOrGet(reg, m.requestDuration).(*prometheus.HistogramVec)
m.batchRetries = util.MustRegisterOrGet(reg, m.batchRetries).(*prometheus.CounterVec)
}

return &m
}

func mustRegisterOrGet(reg prometheus.Registerer, c prometheus.Collector) prometheus.Collector {
if err := reg.Register(c); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
return are.ExistingCollector
}
panic(err)
}
return c
}

// Client pushes entries to Loki and can be stopped
type Client interface {
loki.EntryHandler
Expand Down
3 changes: 2 additions & 1 deletion component/common/loki/client/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr

walWatcherMetrics := wal.NewWatcherMetrics(reg)
walMarkerMetrics := internal.NewMarkerMetrics(reg)
queueClientMetrics := NewQueueClientMetrics(reg)

if len(clientCfgs) == 0 {
return nil, fmt.Errorf("at least one client config must be provided")
Expand Down Expand Up @@ -98,7 +99,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr
}
markerHandler := internal.NewMarkerHandler(markerFileHandler, walCfg.MaxSegmentAge, logger, walMarkerMetrics.WithCurriedId(clientName))

queue, err := NewQueue(metrics, cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger, markerHandler)
queue, err := NewQueue(metrics, queueClientMetrics.CurryWithId(clientName), cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger, markerHandler)
if err != nil {
return nil, fmt.Errorf("error starting queue client: %w", err)
}
Expand Down
37 changes: 37 additions & 0 deletions component/common/loki/client/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package client

import (
"github.com/grafana/agent/pkg/util"
"github.com/prometheus/client_golang/prometheus"
)

type QueueClientMetrics struct {
lastReadTimestamp *prometheus.GaugeVec
}

func NewQueueClientMetrics(reg prometheus.Registerer) *QueueClientMetrics {
m := &QueueClientMetrics{
lastReadTimestamp: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "loki_write",
Name: "last_read_timestamp",
Help: "Latest timestamp read from the WAL",
},
[]string{"id"},
),
}

if reg != nil {
m.lastReadTimestamp = util.MustRegisterOrGet(reg, m.lastReadTimestamp).(*prometheus.GaugeVec)
}

return m
}

func (m *QueueClientMetrics) CurryWithId(id string) *QueueClientMetrics {
return &QueueClientMetrics{
lastReadTimestamp: m.lastReadTimestamp.MustCurryWith(map[string]string{
"id": id,
}),
}
}
25 changes: 18 additions & 7 deletions component/common/loki/client/queue_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,11 @@ func (q *queue) closeNow() {
// which allows it to be injected in the wal.Watcher as a destination where to write read series and entries. As the watcher
// reads from the WAL, batches are created and dispatched onto a send queue when ready to be sent.
type queueClient struct {
metrics *Metrics
logger log.Logger
cfg Config
client *http.Client
metrics *Metrics
qcMetrics *QueueClientMetrics
logger log.Logger
cfg Config
client *http.Client

batches map[string]*batch
batchesMtx sync.Mutex
Expand All @@ -180,14 +181,14 @@ type queueClient struct {
}

// NewQueue creates a new queueClient.
func NewQueue(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (StoppableWriteTo, error) {
func NewQueue(metrics *Metrics, queueClientMetrics *QueueClientMetrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (StoppableWriteTo, error) {
if cfg.StreamLagLabels.String() != "" {
return nil, fmt.Errorf("client config stream_lag_labels is deprecated and the associated metric has been removed, stream_lag_labels: %+v", cfg.StreamLagLabels.String())
}
return newQueueClient(metrics, cfg, maxStreams, maxLineSize, maxLineSizeTruncate, logger, markerHandler)
return newQueueClient(metrics, queueClientMetrics, cfg, maxStreams, maxLineSize, maxLineSizeTruncate, logger, markerHandler)
}

func newQueueClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (*queueClient, error) {
func newQueueClient(metrics *Metrics, qcMetrics *QueueClientMetrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (*queueClient, error) {
if cfg.URL.URL == nil {
return nil, errors.New("client needs target URL")
}
Expand All @@ -198,6 +199,7 @@ func newQueueClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, m
logger: log.With(logger, "component", "client", "host", cfg.URL.Host),
cfg: cfg,
metrics: metrics,
qcMetrics: qcMetrics,
drainTimeout: cfg.Queue.DrainTimeout,
quit: make(chan struct{}),

Expand Down Expand Up @@ -283,16 +285,25 @@ func (c *queueClient) AppendEntries(entries wal.RefEntries, segment int) error {
c.seriesLock.RLock()
l, ok := c.series[entries.Ref]
c.seriesLock.RUnlock()
var maxSeenTimestamp int64 = -1
if ok {
for _, e := range entries.Entries {
c.appendSingleEntry(segment, l, e)
if e.Timestamp.Unix() > maxSeenTimestamp {
maxSeenTimestamp = e.Timestamp.Unix()
}
}
// count all enqueued appended entries as received from WAL
c.markerHandler.UpdateReceivedData(segment, len(entries.Entries))
} else {
// TODO(thepalbi): Add metric here
level.Debug(c.logger).Log("msg", "series for entry not found")
}

// It's safe to assume that upon an AppendEntries call, there will always be at least
// one entry.
c.qcMetrics.lastReadTimestamp.WithLabelValues().Set(float64(maxSeenTimestamp))

return nil
}

Expand Down
6 changes: 2 additions & 4 deletions component/common/loki/client/queue_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ func TestQueueClient(t *testing.T) {

logger := log.NewLogfmtLogger(os.Stdout)

m := NewMetrics(reg)
qc, err := NewQueue(m, cfg, 0, 0, false, logger, nilMarkerHandler{})
qc, err := NewQueue(NewMetrics(reg), NewQueueClientMetrics(reg).CurryWithId("test"), cfg, 0, 0, false, logger, nilMarkerHandler{})
require.NoError(t, err)

//labels := model.LabelSet{"app": "test"}
Expand Down Expand Up @@ -281,8 +280,7 @@ func runQueueClientBenchCase(b *testing.B, bc testCase, mhFactory func(t *testin

logger := log.NewLogfmtLogger(os.Stdout)

m := NewMetrics(reg)
qc, err := NewQueue(m, cfg, 0, 0, false, logger, mhFactory(b))
qc, err := NewQueue(NewMetrics(reg), NewQueueClientMetrics(reg).CurryWithId("test"), cfg, 0, 0, false, logger, mhFactory(b))
require.NoError(b, err)

//labels := model.LabelSet{"app": "test"}
Expand Down
27 changes: 10 additions & 17 deletions component/common/loki/wal/watcher_metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package wal

import "github.com/prometheus/client_golang/prometheus"
import (
"github.com/grafana/agent/pkg/util"
"github.com/prometheus/client_golang/prometheus"
)

type WatcherMetrics struct {
recordsRead *prometheus.CounterVec
Expand Down Expand Up @@ -80,23 +83,13 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
}

if reg != nil {
m.recordsRead = mustRegisterOrGet(reg, m.recordsRead).(*prometheus.CounterVec)
m.recordDecodeFails = mustRegisterOrGet(reg, m.recordDecodeFails).(*prometheus.CounterVec)
m.droppedWriteNotifications = mustRegisterOrGet(reg, m.droppedWriteNotifications).(*prometheus.CounterVec)
m.segmentRead = mustRegisterOrGet(reg, m.segmentRead).(*prometheus.CounterVec)
m.currentSegment = mustRegisterOrGet(reg, m.currentSegment).(*prometheus.GaugeVec)
m.watchersRunning = mustRegisterOrGet(reg, m.watchersRunning).(*prometheus.GaugeVec)
m.recordsRead = util.MustRegisterOrGet(reg, m.recordsRead).(*prometheus.CounterVec)
m.recordDecodeFails = util.MustRegisterOrGet(reg, m.recordDecodeFails).(*prometheus.CounterVec)
m.droppedWriteNotifications = util.MustRegisterOrGet(reg, m.droppedWriteNotifications).(*prometheus.CounterVec)
m.segmentRead = util.MustRegisterOrGet(reg, m.segmentRead).(*prometheus.CounterVec)
m.currentSegment = util.MustRegisterOrGet(reg, m.currentSegment).(*prometheus.GaugeVec)
m.watchersRunning = util.MustRegisterOrGet(reg, m.watchersRunning).(*prometheus.GaugeVec)
}

return m
}

func mustRegisterOrGet(reg prometheus.Registerer, c prometheus.Collector) prometheus.Collector {
if err := reg.Register(c); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
return are.ExistingCollector
}
panic(err)
}
return c
}
11 changes: 11 additions & 0 deletions component/common/loki/wal/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Writer struct {

reclaimedOldSegmentsSpaceCounter *prometheus.CounterVec
lastReclaimedSegment *prometheus.GaugeVec
lastWrittenTimestamp *prometheus.GaugeVec

closeCleaner chan struct{}
}
Expand Down Expand Up @@ -96,10 +97,17 @@ func NewWriter(walCfg Config, logger log.Logger, reg prometheus.Registerer) (*Wr
Name: "last_reclaimed_segment",
Help: "Last reclaimed segment number",
}, []string{})
wrt.lastWrittenTimestamp = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "loki_write",
Subsystem: "wal_writer",
Name: "last_written_timestamp",
Help: "Latest timestamp that was written to the WAL",
}, []string{})

if reg != nil {
_ = reg.Register(wrt.reclaimedOldSegmentsSpaceCounter)
_ = reg.Register(wrt.lastReclaimedSegment)
_ = reg.Register(wrt.lastWrittenTimestamp)
}

wrt.start(walCfg.MaxSegmentAge)
Expand All @@ -118,6 +126,9 @@ func (wrt *Writer) start(maxSegmentAge time.Duration) {
continue
}

// emit metric with latest written timestamp, to be able to track delay from writer to watcher
wrt.lastWrittenTimestamp.WithLabelValues().Set(float64(e.Timestamp.Unix()))

wrt.writeSubscribersLock.RLock()
for _, s := range wrt.writeSubscribers {
s.NotifyWrite()
Expand Down
16 changes: 16 additions & 0 deletions pkg/util/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package util

import "github.com/prometheus/client_golang/prometheus"

// MustRegisterOrGet will attempt to register the supplied collector into the register. If it's already
// registered, it will return that one.
// In case that the register procedure fails with something other than already registered, this will panic.
func MustRegisterOrGet(reg prometheus.Registerer, c prometheus.Collector) prometheus.Collector {
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
if err := reg.Register(c); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
return are.ExistingCollector
}
panic(err)
}
return c
}