Skip to content

Commit

Permalink
verifier: allow to tolerate data loss
Browse files Browse the repository at this point in the history
This mode allows to verify redpanda when write caching is enabled. In
addition to tolerating data loss we also record and export to the
/status endpoint the number of offsets/records that are considered lost
from the point of view of the verifier.
  • Loading branch information
nvartolomei committed Mar 21, 2024
1 parent eca6bbe commit 1cb796e
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 8 deletions.
3 changes: 3 additions & 0 deletions cmd/kgo-verifier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ var (

compressionType = flag.String("compression-type", "", "One of none, gzip, snappy, lz4, zstd, or 'mixed' to pick a random codec for each producer")
compressiblePayload = flag.Bool("compressible-payload", false, "If true, use a highly compressible payload instead of the default random payload")

tolerateDataLoss = flag.Bool("tolerate-data-loss", false, "If true, tolerate data-loss events")
)

func makeWorkerConfig() worker.WorkerConfig {
Expand All @@ -85,6 +87,7 @@ func makeWorkerConfig() worker.WorkerConfig {
Transactions: *useTransactions,
CompressionType: *compressionType,
CompressiblePayload: *compressiblePayload,
TolerateDataLoss: *tolerateDataLoss,
Continuous: *continuous,
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/worker/verifier/group_read_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package verifier

import (
"context"
"errors"
"fmt"
"os"
"sync"
Expand Down Expand Up @@ -249,7 +250,13 @@ func (grw *GroupReadWorker) consumerGroupReadInner(
log.Warnf(
"fiber %v: Consumer group fetch %s/%d e=%v...",
fiberId, t, p, err)
r_err = err
var lossErr *kgo.ErrDataLoss
if grw.config.workerCfg.TolerateDataLoss && errors.As(err, &lossErr) {
grw.Status.Validator.RecordLostOffsets(lossErr.Partition, lossErr.ConsumedTo-lossErr.ResetTo)
grw.Status.Validator.SetMonotonicityTestStateForPartition(p, lossErr.ResetTo-1)
} else {
r_err = err
}
})

if r_err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/worker/verifier/offset_ranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type OffsetRange struct {

type OffsetRanges struct {
Ranges []OffsetRange

tolerateDataLoss bool
}

func (ors *OffsetRanges) Insert(o int64) {
Expand All @@ -57,7 +59,7 @@ func (ors *OffsetRanges) Insert(o int64) {
last.Upper += 1
return
} else {
if o < last.Upper {
if o < last.Upper && !ors.tolerateDataLoss {
// TODO: more flexible structure for out of order inserts, at the moment
// we rely on franz-go callbacks being invoked in order.
util.Die("Out of order offset %d (vs %d %d)", o, last.Lower, last.Upper)
Expand Down
20 changes: 15 additions & 5 deletions pkg/worker/verifier/producer_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,25 @@ type ProducerWorker struct {
transactionSTMConfig worker.TransactionSTMConfig
transactionSTM *worker.TransactionSTM
churnProducers bool

tolerateDataLoss bool
}

func NewProducerWorker(cfg ProducerConfig) ProducerWorker {
validOffsets := LoadTopicOffsetRanges(cfg.workerCfg.Topic, cfg.nPartitions)
if cfg.workerCfg.TolerateDataLoss {
for ix := range validOffsets.PartitionRanges {
validOffsets.PartitionRanges[ix].tolerateDataLoss = true
}
}

return ProducerWorker{
config: cfg,
Status: NewProducerWorkerStatus(cfg.workerCfg.Topic),
validOffsets: LoadTopicOffsetRanges(cfg.workerCfg.Topic, cfg.nPartitions),
payload: cfg.valueGenerator.Generate(),
churnProducers: cfg.messagesPerProducerId > 0,
config: cfg,
Status: NewProducerWorkerStatus(cfg.workerCfg.Topic),
validOffsets: validOffsets,
payload: cfg.valueGenerator.Generate(),
churnProducers: cfg.messagesPerProducerId > 0,
tolerateDataLoss: cfg.workerCfg.TolerateDataLoss,
}
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/worker/verifier/seq_read_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package verifier

import (
"context"
"errors"
"sync"

worker "github.com/redpanda-data/kgo-verifier/pkg/worker"
Expand Down Expand Up @@ -139,7 +140,13 @@ func (srw *SeqReadWorker) sequentialReadInner(ctx context.Context, startAt []int
var r_err error
fetches.EachError(func(t string, p int32, err error) {
log.Warnf("Sequential fetch %s/%d e=%v...", t, p, err)
r_err = err
var lossErr *kgo.ErrDataLoss
if srw.config.workerCfg.TolerateDataLoss && errors.As(err, &lossErr) {
srw.Status.Validator.RecordLostOffsets(lossErr.Partition, lossErr.ConsumedTo-lossErr.ResetTo)
srw.Status.Validator.SetMonotonicityTestStateForPartition(p, lossErr.ResetTo-1)
} else {
r_err = err
}
})

if r_err != nil {
Expand Down
20 changes: 20 additions & 0 deletions pkg/worker/verifier/validator_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type ValidatorStatus struct {
// The highest valid offset consumed throughout the consumer's lifetime
MaxOffsetsConsumed map[int32]int64 `json:"max_offsets_consumed"`

LostOffsets map[int32]int64 `json:"lost_offsets"`

// Concurrent access happens when doing random reads
// with multiple reader fibers
lock sync.Mutex
Expand Down Expand Up @@ -103,13 +105,31 @@ func (cs *ValidatorStatus) recordOffset(r *kgo.Record) {
cs.lastOffsetConsumed[r.Partition] = r.Offset
}

func (cs *ValidatorStatus) RecordLostOffsets(p int32, count int64) {
cs.lock.Lock()
defer cs.lock.Unlock()

if cs.LostOffsets == nil {
cs.LostOffsets = make(map[int32]int64)
}

cs.LostOffsets[p] += count
}

func (cs *ValidatorStatus) ResetMonotonicityTestState() {
cs.lock.Lock()
defer cs.lock.Unlock()

cs.lastOffsetConsumed = make(map[int32]int64)
}

func (cs *ValidatorStatus) SetMonotonicityTestStateForPartition(partition int32, offset int64) {
cs.lock.Lock()
defer cs.lock.Unlock()

cs.lastOffsetConsumed[partition] = offset
}

func (cs *ValidatorStatus) Checkpoint() {
log.Infof("Validator status: %s", cs.String())
}
Expand Down
1 change: 1 addition & 0 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ type WorkerConfig struct {
// incompressible payload.
CompressiblePayload bool

TolerateDataLoss bool
Continuous bool
}

Expand Down

0 comments on commit 1cb796e

Please sign in to comment.