Skip to content

Commit

Permalink
worker: use LatestValueProduced in workers
Browse files Browse the repository at this point in the history
As a means to verify the results for a compacted topic. Use the flag
`--validate-latest-values` to trigger validation.
  • Loading branch information
WillemKauf committed Nov 20, 2024
1 parent b670585 commit 5c5b31d
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 34 deletions.
10 changes: 6 additions & 4 deletions cmd/kgo-verifier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ var (
tolerateDataLoss = flag.Bool("tolerate-data-loss", false, "If true, tolerate data-loss events")
tolerateFailedProduce = flag.Bool("tolerate-failed-produce", false, "If true, tolerate and retry failed produce")
tombstoneProbability = flag.Float64("tombstone-probability", 0.0, "The probability (between 0.0 and 1.0) that a record produced is a tombstone record.")
compacted = flag.Bool("compacted", false, "Whether the topic to be verified is compacted or not.")
compacted = flag.Bool("compacted", false, "Whether the topic to be verified is compacted or not. This will suppress warnings about offset gaps in consumed values.")
validateLatestValues = flag.Bool("validate-latest-values", false, "If true, values consumed by a worker will be validated against the last produced value by a producer. This value should only be set if compaction has been allowed to fully de-duplicate the entirety of the log before consuming.")
)

func makeWorkerConfig() worker.WorkerConfig {
Expand All @@ -94,6 +95,7 @@ func makeWorkerConfig() worker.WorkerConfig {
TolerateDataLoss: *tolerateDataLoss,
TolerateFailedProduce: *tolerateFailedProduce,
Continuous: *continuous,
ValidateLatestValues: *validateLatestValues,
}

return c
Expand Down Expand Up @@ -263,7 +265,7 @@ func main() {
srw := verifier.NewSeqReadWorker(verifier.NewSeqReadConfig(
makeWorkerConfig(), "sequential", nPartitions, *seqConsumeCount,
(*consumeTputMb)*1024*1024,
), verifier.NewValidatorStatus(*compacted))
), verifier.NewValidatorStatus(*compacted, *validateLatestValues, *topic, nPartitions))
workers = append(workers, &srw)

for loopState.Next() {
Expand All @@ -282,7 +284,7 @@ func main() {
workerCfg := verifier.NewRandomReadConfig(
makeWorkerConfig(), fmt.Sprintf("random-%03d", i), nPartitions, *cCount,
)
worker := verifier.NewRandomReadWorker(workerCfg, verifier.NewValidatorStatus(*compacted))
worker := verifier.NewRandomReadWorker(workerCfg, verifier.NewValidatorStatus(*compacted, *validateLatestValues, *topic, nPartitions))
randomWorkers = append(randomWorkers, &worker)
workers = append(workers, &worker)
}
Expand Down Expand Up @@ -311,7 +313,7 @@ func main() {
grw := verifier.NewGroupReadWorker(
verifier.NewGroupReadConfig(
makeWorkerConfig(), *cgName, nPartitions, *cgReaders,
*seqConsumeCount, (*consumeTputMb)*1024*1024), verifier.NewValidatorStatus(*compacted))
*seqConsumeCount, (*consumeTputMb)*1024*1024), verifier.NewValidatorStatus(*compacted, *validateLatestValues, *topic, nPartitions))
workers = append(workers, &grw)

for loopState.Next() {
Expand Down
6 changes: 5 additions & 1 deletion pkg/worker/verifier/group_read_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ func (grw *GroupReadWorker) consumerGroupReadInner(
defer client.Close()

validRanges := LoadTopicOffsetRanges(grw.config.workerCfg.Topic, grw.config.nPartitions)
var latestValuesProduced LatestValueMap
if grw.Status.Validator.expectFullyCompacted {
latestValuesProduced = LoadLatestValues(grw.config.workerCfg.Topic, grw.config.nPartitions)
}

for {
fetches := client.PollFetches(ctx)
Expand Down Expand Up @@ -271,7 +275,7 @@ func (grw *GroupReadWorker) consumerGroupReadInner(
log.Debugf(
"fiber %v: Consumer group read %s/%d o=%d...",
fiberId, grw.config.workerCfg.Topic, r.Partition, r.Offset)
grw.Status.Validator.ValidateRecord(r, &validRanges)
grw.Status.Validator.ValidateRecord(r, &validRanges, &latestValuesProduced)
// Will cancel the context if we have read everything
cgOffsets.AddRecord(ctx, r)
})
Expand Down
13 changes: 10 additions & 3 deletions pkg/worker/verifier/producer_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type ProducerWorker struct {

tolerateDataLoss bool
tolerateFailedProduce bool

validateLatestValues bool
}

func NewProducerWorker(cfg ProducerConfig) ProducerWorker {
Expand All @@ -89,6 +91,7 @@ func NewProducerWorker(cfg ProducerConfig) ProducerWorker {
churnProducers: cfg.messagesPerProducerId > 0,
tolerateDataLoss: cfg.workerCfg.TolerateDataLoss,
tolerateFailedProduce: cfg.workerCfg.TolerateFailedProduce,
validateLatestValues: cfg.workerCfg.ValidateLatestValues,
}
}

Expand Down Expand Up @@ -229,8 +232,10 @@ func (pw *ProducerWorker) Store() {
err := pw.validOffsets.Store()
util.Chk(err, "Error writing offset map: %v", err)

err = pw.latestValueProduced.Store()
util.Chk(err, "Error writing latest value map: %v", err)
if pw.validateLatestValues {
err = pw.latestValueProduced.Store()
util.Chk(err, "Error writing latest value map: %v", err)
}
}

func (pw *ProducerWorker) produceCheckpoint() {
Expand Down Expand Up @@ -394,7 +399,9 @@ func (pw *ProducerWorker) produceInner(n int64) (int64, []BadOffset, error) {
pw.Status.latency.Update(ackLatency.Microseconds())
log.Debugf("Wrote partition %d at %d", r.Partition, r.Offset)
pw.validOffsets.Insert(r.Partition, r.Offset)
pw.latestValueProduced.Insert(r.Partition, string(r.Key), string(r.Value))
if pw.validateLatestValues {
pw.latestValueProduced.Insert(r.Partition, string(r.Key), string(r.Value))
}

}
wg.Done()
Expand Down
8 changes: 6 additions & 2 deletions pkg/worker/verifier/random_read_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,18 @@ func (w *RandomReadWorker) Wait() error {
}

validRanges := LoadTopicOffsetRanges(w.config.workerCfg.Topic, w.config.nPartitions)
var latestValuesProduced LatestValueMap
if w.Status.Validator.expectFullyCompacted {
latestValuesProduced = LoadLatestValues(w.config.workerCfg.Topic, w.config.nPartitions)
}

ctxLog := log.WithFields(log.Fields{"tag": w.config.name})

readCount := w.config.readCount

// Select a partition and location
ctxLog.Infof("Reading %d random offsets", w.config.readCount)

i := 0
for i < readCount {
w.Status.Validator.ResetMonotonicityTestState()
Expand Down Expand Up @@ -146,7 +150,7 @@ func (w *RandomReadWorker) Wait() error {
if r.Partition != p {
util.Die("Wrong partition %d in read at offset %d on partition %s/%d", r.Partition, r.Offset, w.config.workerCfg.Topic, p)
}
w.Status.Validator.ValidateRecord(r, &validRanges)
w.Status.Validator.ValidateRecord(r, &validRanges, &latestValuesProduced)
})
if len(fetches.Records()) == 0 {
ctxLog.Errorf("Reloading offsets on empty response reading from partition %d at %s", p, offset)
Expand Down
6 changes: 5 additions & 1 deletion pkg/worker/verifier/seq_read_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func (srw *SeqReadWorker) sequentialReadInner(ctx context.Context, startAt []int
offsets[srw.config.workerCfg.Topic] = partOffsets

validRanges := LoadTopicOffsetRanges(srw.config.workerCfg.Topic, srw.config.nPartitions)
var latestValuesProduced LatestValueMap
if srw.Status.Validator.expectFullyCompacted {
latestValuesProduced = LoadLatestValues(srw.config.workerCfg.Topic, srw.config.nPartitions)
}

opts := srw.config.workerCfg.MakeKgoOpts()
opts = append(opts, []kgo.Opt{
Expand Down Expand Up @@ -180,7 +184,7 @@ func (srw *SeqReadWorker) sequentialReadInner(ctx context.Context, startAt []int
if r.Attrs.IsControl() {
return
}
srw.Status.Validator.ValidateRecord(r, &validRanges)
srw.Status.Validator.ValidateRecord(r, &validRanges, &latestValuesProduced)
})

if srw.config.maxReadCount >= 0 && curReadCount >= srw.config.maxReadCount {
Expand Down
26 changes: 16 additions & 10 deletions pkg/worker/verifier/validator_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
)

func NewValidatorStatus(compacted bool) ValidatorStatus {
func NewValidatorStatus(compacted bool, expectFullyCompacted bool, topic string, nPartitions int32) ValidatorStatus {
return ValidatorStatus{
MaxOffsetsConsumed: make(map[int32]int64),

lastCheckpoint: time.Now(),
compacted: compacted,
MaxOffsetsConsumed: make(map[int32]int64),
lastCheckpoint: time.Now(),
compacted: compacted,
expectFullyCompacted: expectFullyCompacted,
}
}

Expand Down Expand Up @@ -54,17 +54,17 @@ type ValidatorStatus struct {
// Last consumed offset per partition. Used to assert monotonicity and check for gaps.
lastOffsetConsumed map[int32]int64

// The latest offset seen for a given key. Used to help track the latest key-value pair that should be seen.
lastOffsetPerKeyConsumed map[string]int64

// Last leader epoch per partition. Used to assert monotonicity.
lastLeaderEpoch map[int32]int32

// Whether the topic to be consumed is compacted. Gaps in offsets will be ignored if true.
compacted bool

// Whether the values consumed should be verified against the last produced value for a given key in the log.
expectFullyCompacted bool
}

func (cs *ValidatorStatus) ValidateRecord(r *kgo.Record, validRanges *TopicOffsetRanges) {
func (cs *ValidatorStatus) ValidateRecord(r *kgo.Record, validRanges *TopicOffsetRanges, latestValuesProduced *LatestValueMap) {
expect_header_value := fmt.Sprintf("%06d.%018d", 0, r.Offset)
log.Debugf("Consumed %s on p=%d at o=%d leaderEpoch=%d", r.Key, r.Partition, r.Offset, r.LeaderEpoch)
cs.lock.Lock()
Expand Down Expand Up @@ -112,6 +112,13 @@ func (cs *ValidatorStatus) ValidateRecord(r *kgo.Record, validRanges *TopicOffse
log.Debugf("Read OK (%s) on p=%d at o=%d", r.Headers[0].Value, r.Partition, r.Offset)
}

if cs.expectFullyCompacted {
latestValue, exists := latestValuesProduced.Get(r.Partition, string(r.Key))
if !exists || latestValue != string(r.Value) {
log.Panicf("Consumed value for key %s does not match the latest produced value in a compacted topic- did compaction for partition %s/%d occur betwen producing and consuming?", r.Key, r.Topic, r.Partition)
}
}

cs.recordOffset(r, recordExpected)

if time.Since(cs.lastCheckpoint) > time.Second*5 {
Expand All @@ -130,7 +137,6 @@ func (cs *ValidatorStatus) recordOffset(r *kgo.Record, recordExpected bool) {
if cs.lastLeaderEpoch == nil {
cs.lastLeaderEpoch = make(map[int32]int32)
}

// We bump highest offset only for valid records.
if r.Offset > cs.MaxOffsetsConsumed[r.Partition] && recordExpected {
cs.MaxOffsetsConsumed[r.Partition] = r.Offset
Expand Down
26 changes: 13 additions & 13 deletions pkg/worker/verifier/validator_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestValidatorStatus_ValidateRecordHappyPath(t *testing.T) {
validator := verifier.NewValidatorStatus(false)
validator := verifier.NewValidatorStatus(false, false, "topic", 1)
validRanges := verifier.NewTopicOffsetRanges("topic", 1)
validRanges.Insert(0, 41)
validRanges.Insert(0, 42)
Expand All @@ -19,18 +19,18 @@ func TestValidatorStatus_ValidateRecordHappyPath(t *testing.T) {
Offset: 41,
LeaderEpoch: 0,
Headers: []kgo.RecordHeader{{Key: "key", Value: []byte("000000.000000000000000041")}},
}, &validRanges)
}, &validRanges, nil)

validator.ValidateRecord(&kgo.Record{
Offset: 42,
LeaderEpoch: 0,
Headers: []kgo.RecordHeader{{Key: "key", Value: []byte("000000.000000000000000042")}},
}, &validRanges)
}, &validRanges, nil)

validator.ValidateRecord(&kgo.Record{
Offset: 43,
LeaderEpoch: 1,
}, &validRanges)
}, &validRanges, nil)

assert.Equal(t, int64(2), validator.ValidReads)
assert.Equal(t, int64(0), validator.InvalidReads)
Expand All @@ -42,7 +42,7 @@ func TestValidatorStatus_ValidateRecordHappyPath(t *testing.T) {
}

func TestValidatorStatus_ValidateRecordInvalidRead(t *testing.T) {
validator := verifier.NewValidatorStatus(false)
validator := verifier.NewValidatorStatus(false, false, "topic", 1)
validRanges := verifier.NewTopicOffsetRanges("topic", 1)
validRanges.Insert(0, 41)

Expand All @@ -58,18 +58,18 @@ func TestValidatorStatus_ValidateRecordInvalidRead(t *testing.T) {
Offset: 41,
LeaderEpoch: 0,
Headers: []kgo.RecordHeader{{Key: "key", Value: []byte("000000.000000000000000040")}},
}, &validRanges)
}, &validRanges, nil)
}()
}

func TestValidatorStatus_ValidateRecordNonMonotonicOffset(t *testing.T) {
validator := verifier.NewValidatorStatus(false)
validator := verifier.NewValidatorStatus(false, false, "topic", 1)
validRanges := verifier.NewTopicOffsetRanges("topic", 1)

validator.ValidateRecord(&kgo.Record{
Offset: 41,
LeaderEpoch: 0,
}, &validRanges)
}, &validRanges, nil)

// Same offset read again.
func() {
Expand All @@ -82,7 +82,7 @@ func TestValidatorStatus_ValidateRecordNonMonotonicOffset(t *testing.T) {
validator.ValidateRecord(&kgo.Record{
Offset: 41,
LeaderEpoch: 0,
}, &validRanges)
}, &validRanges, nil)
}()

// Lower offset read after a higher offset.
Expand All @@ -96,18 +96,18 @@ func TestValidatorStatus_ValidateRecordNonMonotonicOffset(t *testing.T) {
validator.ValidateRecord(&kgo.Record{
Offset: 40,
LeaderEpoch: 0,
}, &validRanges)
}, &validRanges, nil)
}()
}

func TestValidatorStatus_ValidateRecordNonMonotonicLeaderEpoch(t *testing.T) {
validator := verifier.NewValidatorStatus(false)
validator := verifier.NewValidatorStatus(false, false, "topic", 1)
validRanges := verifier.NewTopicOffsetRanges("topic", 1)

validator.ValidateRecord(&kgo.Record{
Offset: 41,
LeaderEpoch: 1,
}, &validRanges)
}, &validRanges, nil)

func() {
defer func() {
Expand All @@ -119,7 +119,7 @@ func TestValidatorStatus_ValidateRecordNonMonotonicLeaderEpoch(t *testing.T) {
validator.ValidateRecord(&kgo.Record{
Offset: 42,
LeaderEpoch: 0,
}, &validRanges)
}, &validRanges, nil)
}()

}
1 change: 1 addition & 0 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ type WorkerConfig struct {
TolerateDataLoss bool
TolerateFailedProduce bool
Continuous bool
ValidateLatestValues bool
}

func CompressionCodecFromString(s string) (kgo.CompressionCodec, error) {
Expand Down

0 comments on commit 5c5b31d

Please sign in to comment.