Skip to content

Commit

Permalink
feat(outputs): Implement partial write errors
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed Nov 5, 2024
1 parent f43605a commit 5608625
Show file tree
Hide file tree
Showing 9 changed files with 422 additions and 222 deletions.
20 changes: 20 additions & 0 deletions internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,23 @@ func (e *FatalError) Error() string {
func (e *FatalError) Unwrap() error {
return e.Err
}

// PartialWriteError indicate that only a subset of the metrics were written
// successfully (i.e. accepted). The rejected metrics should be removed from
// the buffer without being successfully written. Please note: the metrics
// are specified as indices into the batch to be able to reference tracking
// metrics correctly.
type PartialWriteError struct {
Err error
MetricsAccept []int
MetricsReject []int
MetricsRejectErrors []error
}

func (e *PartialWriteError) Error() string {
return e.Err.Error()
}

func (e *PartialWriteError) Unwrap() error {
return e.Err
}
89 changes: 71 additions & 18 deletions models/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,73 @@ import (
)

var (
AgentMetricsWritten = selfstat.Register("agent", "metrics_written", make(map[string]string))
AgentMetricsDropped = selfstat.Register("agent", "metrics_dropped", make(map[string]string))
AgentMetricsWritten = selfstat.Register("agent", "metrics_written", make(map[string]string))
AgentMetricsRejected = selfstat.Register("agent", "metrics_rejected", make(map[string]string))
AgentMetricsDropped = selfstat.Register("agent", "metrics_dropped", make(map[string]string))

registerGob = sync.OnceFunc(func() { metric.Init() })
)

type Transaction struct {
// Batch of metrics to write
Batch []telegraf.Metric

// Accept denotes the indices of metrics that were successfully written
Accept []int
// Reject denotes the indices of metrics that were not written but should
// not be requeued
Reject []int

// Marks this transaction as valid
valid bool

// Internal state that can be used by the buffer implementation
state interface{}
}

func (tx *Transaction) AcceptAll() {
tx.Accept = make([]int, len(tx.Batch))
for i := range tx.Batch {
tx.Accept[i] = i
}
}

func (tx *Transaction) KeepAll() {}

func (tx *Transaction) InferKeep() []int {
used := make([]bool, len(tx.Batch))
for _, idx := range tx.Accept {
used[idx] = true
}
for _, idx := range tx.Reject {
used[idx] = true
}

keep := make([]int, 0, len(tx.Batch))
for i := range tx.Batch {
if !used[i] {
keep = append(keep, i)
}
}
return keep
}

type Buffer interface {
// Len returns the number of metrics currently in the buffer.
Len() int

// Add adds metrics to the buffer and returns number of dropped metrics.
Add(metrics ...telegraf.Metric) int

// Batch returns a slice containing up to batchSize of the oldest metrics not
// yet dropped. Metrics are ordered from oldest to newest in the batch. The
// batch must not be modified by the client.
Batch(batchSize int) []telegraf.Metric

// Accept marks the batch, acquired from Batch(), as successfully written.
Accept(metrics []telegraf.Metric)
// Batch starts a transaction by returning a slice of metrics up to the
// given batch-size starting from the oldest metric in the buffer. Metrics
// are ordered from oldest to newest and must not be modified by the plugin.
BeginTransaction(batchSize int) *Transaction

// Reject returns the batch, acquired from Batch(), to the buffer and marks it
// as unsent.
Reject([]telegraf.Metric)
// Flush ends a metric and persists the buffer state
EndTransaction(*Transaction)

// Stats returns the buffer statistics such as rejected, dropped and accepred metrics
// Stats returns the buffer statistics such as rejected, dropped and accepted metrics
Stats() BufferStats

// Close finalizes the buffer and closes all open resources
Expand All @@ -45,11 +86,12 @@ type Buffer interface {
// BufferStats holds common metrics used for buffer implementations.
// Implementations of Buffer should embed this struct in them.
type BufferStats struct {
MetricsAdded selfstat.Stat
MetricsWritten selfstat.Stat
MetricsDropped selfstat.Stat
BufferSize selfstat.Stat
BufferLimit selfstat.Stat
MetricsAdded selfstat.Stat
MetricsWritten selfstat.Stat
MetricsRejected selfstat.Stat
MetricsDropped selfstat.Stat
BufferSize selfstat.Stat
BufferLimit selfstat.Stat
}

// NewBuffer returns a new empty Buffer with the given capacity.
Expand Down Expand Up @@ -84,6 +126,11 @@ func NewBufferStats(name string, alias string, capacity int) BufferStats {
"metrics_written",
tags,
),
MetricsRejected: selfstat.Register(
"write",
"metrics_rejected",
tags,
),
MetricsDropped: selfstat.Register(
"write",
"metrics_dropped",
Expand Down Expand Up @@ -115,6 +162,12 @@ func (b *BufferStats) metricWritten(m telegraf.Metric) {
m.Accept()
}

func (b *BufferStats) metricRejected(m telegraf.Metric) {
AgentMetricsRejected.Incr(1)
b.MetricsRejected.Incr(1)
m.Reject()
}

func (b *BufferStats) metricDropped(m telegraf.Metric) {
AgentMetricsDropped.Incr(1)
b.MetricsDropped.Incr(1)
Expand Down
127 changes: 86 additions & 41 deletions models/buffer_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"log"
"path/filepath"
"slices"
"sort"
"sync"

"github.com/tidwall/wal"
Expand All @@ -31,6 +33,11 @@ type DiskBuffer struct {
// we have to do our best and track that the walfile "should" be empty, so that next
// write, we can remove the invalid entry (also skipping this entry if it is being read).
isEmpty bool

// The mask contains offsets of metric already removed during a previous
// transaction. Metrics at those offsets should not be contained in new
// batches.
mask []int
}

func NewDiskBuffer(name, id, path string, stats BufferStats) (*DiskBuffer, error) {
Expand Down Expand Up @@ -67,7 +74,11 @@ func (b *DiskBuffer) length() int {
if b.isEmpty {
return 0
}
// Special case for when the read index is zero, it must be empty (otherwise it would be >= 1)

return b.entries() - len(b.mask)
}

func (b *DiskBuffer) entries() int {
if b.readIndex() == 0 {
return 0
}
Expand Down Expand Up @@ -121,28 +132,33 @@ func (b *DiskBuffer) addSingleMetric(m telegraf.Metric) bool {
return false
}

func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric {
func (b *DiskBuffer) BeginTransaction(batchSize int) *Transaction {
b.Lock()
defer b.Unlock()

if b.length() == 0 {
// no metrics in the wal file, so return an empty array
return []telegraf.Metric{}
return &Transaction{}
}
b.batchFirst = b.readIndex()
var metrics []telegraf.Metric

b.batchSize = 0

metrics := make([]telegraf.Metric, 0, batchSize)
offsets := make([]int, 0, batchSize)
readIndex := b.batchFirst
endIndex := b.writeIndex()
offset := 0
for batchSize > 0 && readIndex < endIndex {
data, err := b.file.Read(readIndex)
if err != nil {
panic(err)
}
readIndex++
offset++

m, err := metric.FromBytes(data)
if slices.Contains(b.mask, offset) {
// Metric is masked by a previous write and is scheduled for removal
continue
}

// Validate that a tracking metric is from this instance of telegraf and skip ones from older instances.
// A tracking metric can be skipped here because metric.Accept() is only called once data is successfully
Expand All @@ -152,11 +168,12 @@ func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric {
// - ErrSkipTracking: means that the tracking information was unable to be found for a tracking ID.
// - Outside of range: means that the metric was guaranteed to be left over from the previous instance
// as it was here when we opened the wal file in this instance.
if errors.Is(err, metric.ErrSkipTracking) {
// could not look up tracking information for metric, skip
continue
}
m, err := metric.FromBytes(data)
if err != nil {
if errors.Is(err, metric.ErrSkipTracking) {
// could not look up tracking information for metric, skip
continue
}
// non-recoverable error in deserialization, abort
log.Printf("E! raw metric data: %v", data)
panic(err)
Expand All @@ -167,33 +184,80 @@ func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric {
}

metrics = append(metrics, m)
offsets = append(offsets, offset)
b.batchSize++
batchSize--
}
return metrics
return &Transaction{Batch: metrics, valid: true, state: offsets}
}

func (b *DiskBuffer) Accept(batch []telegraf.Metric) {
func (b *DiskBuffer) EndTransaction(tx *Transaction) {
if len(tx.Batch) == 0 {
return
}

// Ignore invalid transactions and make sure they can only be finished once
if !tx.valid {
return
}
tx.valid = false

// Get the metric offsets from the transaction
offsets := tx.state.([]int)

b.Lock()
defer b.Unlock()

if b.batchSize == 0 || len(batch) == 0 {
// nothing to accept
// Mark metrics which should be removed in the internal mask
remove := make([]int, 0, len(tx.Batch)-len(tx.Accept)-len(tx.Reject))
for _, idx := range tx.Accept {
b.metricWritten(tx.Batch[idx])
remove = append(remove, offsets[idx])
}
for _, idx := range tx.Reject {
b.metricRejected(tx.Batch[idx])
remove = append(remove, offsets[idx])
}
b.mask = append(b.mask, remove...)
sort.Ints(b.mask)

// Remove the metrics that are marked for removal from the front of the
// WAL file. All other metrics must be kept.
if len(b.mask) == 0 || b.mask[0] != 0 {
// Mask is empty or the first index is not the front of the file, so
// exit early as there is nothing to remove
return
}
for _, m := range batch {
b.metricWritten(m)

// Determine up to which index we can remove the entries from the WAL file
var removeIdx int
for i, offset := range b.mask {
if offset != i {
break
}
removeIdx = offset
}
if b.length() == len(batch) {
b.emptyFile()

// Remove the metrics in front from the WAL file
b.isEmpty = b.entries()-removeIdx-1 <= 0
if b.isEmpty {
// WAL files cannot be fully empty but need to contain at least one
// item to not throw an error
if err := b.file.TruncateFront(b.writeIndex()); err != nil {
panic(err)
}
} else {
err := b.file.TruncateFront(b.batchFirst + uint64(len(batch)))
if err != nil {
log.Printf("E! batch length: %d, batchFirst: %d, batchSize: %d", len(batch), b.batchFirst, b.batchSize)
if err := b.file.TruncateFront(b.batchFirst + uint64(removeIdx+1)); err != nil {
panic(err)
}
}

// Truncate the mask and update the relative offsets
b.mask = b.mask[:removeIdx]
for i := range b.mask {
b.mask[i] -= removeIdx
}

// check if the original end index is still valid, clear if not
if b.originalEnd < b.readIndex() {
b.originalEnd = 0
Expand All @@ -203,14 +267,6 @@ func (b *DiskBuffer) Accept(batch []telegraf.Metric) {
b.BufferSize.Set(int64(b.length()))
}

func (b *DiskBuffer) Reject(_ []telegraf.Metric) {
// very little to do here as the disk buffer retains metrics in
// the wal file until a call to accept
b.Lock()
defer b.Unlock()
b.resetBatch()
}

func (b *DiskBuffer) Stats() BufferStats {
return b.BufferStats
}
Expand Down Expand Up @@ -238,14 +294,3 @@ func (b *DiskBuffer) handleEmptyFile() {
}
b.isEmpty = false
}

func (b *DiskBuffer) emptyFile() {
if b.isEmpty || b.length() == 0 {
return
}
if err := b.file.TruncateFront(b.writeIndex() - 1); err != nil {
log.Printf("E! writeIndex: %d, buffer len: %d", b.writeIndex(), b.length())
panic(err)
}
b.isEmpty = true
}
10 changes: 5 additions & 5 deletions models/buffer_disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func TestDiskBufferRetainsTrackingInformation(t *testing.T) {
defer buf.Close()

buf.Add(mm)

batch := buf.Batch(1)
buf.Accept(batch)
tx := buf.BeginTransaction(1)
tx.AcceptAll()
buf.EndTransaction(tx)
require.Equal(t, 1, delivered)
}

Expand Down Expand Up @@ -85,11 +85,11 @@ func TestDiskBufferTrackingDroppedFromOldWal(t *testing.T) {
buf.Stats().MetricsDropped.Set(0)
defer buf.Close()

batch := buf.Batch(4)
tx := buf.BeginTransaction(4)

// Check that the tracking metric is skipped
expected := []telegraf.Metric{
metrics[0], metrics[1], metrics[2], metrics[4],
}
testutil.RequireMetricsEqual(t, expected, batch)
testutil.RequireMetricsEqual(t, expected, tx.Batch)
}
Loading

0 comments on commit 5608625

Please sign in to comment.