Skip to content

Commit

Permalink
Merge pull request #61 from WillemKauf/tombstones
Browse files Browse the repository at this point in the history
`verifier`: fix `lastConsumableOffsets`
  • Loading branch information
WillemKauf authored Dec 5, 2024
2 parents 27986ea + bffac1f commit 1b80376
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 16 deletions.
30 changes: 21 additions & 9 deletions pkg/worker/verifier/latest_value_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,29 @@ import (
)

type LatestValueMap struct {
topic string
LatestKvByPartition []map[string]string
topic string
// Latest offset for a given key
latestKoByPartition []map[string]int64
// Latest value for a given key
LatestKvByPartition []map[string][]byte
}

func (lvm *LatestValueMap) Get(partition int32, key string) (value string, exists bool) {
func (lvm *LatestValueMap) GetValue(partition int32, key string) (value []byte, exists bool) {
if partition < 0 || partition >= int32(len(lvm.LatestKvByPartition)) {
log.Panicf("Partition %d out of bounds for latestValueMap of size %d", partition, len(lvm.LatestKvByPartition))
}
value, exists = lvm.LatestKvByPartition[partition][key]
return
}

func (lvm *LatestValueMap) Insert(partition int32, key string, value string) {
func (lvm *LatestValueMap) InsertKeyValue(partition int32, key string, value []byte) {
lvm.LatestKvByPartition[partition][key] = value
}

func (lvm *LatestValueMap) InsertKeyOffset(partition int32, key string, offset int64) {
lvm.latestKoByPartition[partition][key] = offset
}

func latestValueMapFile(topic string) string {
return fmt.Sprintf("latest_value_%s.json", topic)
}
Expand Down Expand Up @@ -73,20 +80,25 @@ func LoadLatestValues(topic string, nPartitions int32) LatestValueMap {
util.Die("More partitions in latest_value_map file than in topic!")
} else if len(lvm.LatestKvByPartition) < int(nPartitions) {
// Creating new partitions is allowed
blanks := make([]map[string]string, nPartitions-int32(len(lvm.LatestKvByPartition)))
blanks := make([]map[string][]byte, nPartitions-int32(len(lvm.LatestKvByPartition)))
lvm.LatestKvByPartition = append(lvm.LatestKvByPartition, blanks...)
}
log.Infof("Successfully read latest value map")
return lvm
}

func NewLatestValueMap(topic string, nPartitions int32) LatestValueMap {
maps := make([]map[string]string, nPartitions)
for i := range maps {
maps[i] = make(map[string]string)
kvMaps := make([]map[string][]byte, nPartitions)
for i := range kvMaps {
kvMaps[i] = make(map[string][]byte)
}
koMaps := make([]map[string]int64, nPartitions)
for i := range koMaps {
koMaps[i] = make(map[string]int64)
}
return LatestValueMap{
topic: topic,
LatestKvByPartition: maps,
LatestKvByPartition: kvMaps,
latestKoByPartition: koMaps,
}
}
4 changes: 4 additions & 0 deletions pkg/worker/verifier/offset_ranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ func (tors *TopicOffsetRanges) Contains(p int32, o int64) bool {
return tors.PartitionRanges[p].Contains(o)
}

func (tors *TopicOffsetRanges) GetLastConsumableOffset(p int32) int64 {
return tors.LastConsumableOffsets[p]
}

func (tors *TopicOffsetRanges) SetLastConsumableOffset(p int32, o int64) {
tors.LastConsumableOffsets[p] = o
}
Expand Down
22 changes: 17 additions & 5 deletions pkg/worker/verifier/producer_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,9 @@ func (pw *ProducerWorker) OnAcked(r *kgo.Record) {
pw.Status.OnAcked(r.Partition, r.Offset)

pw.validOffsets.Insert(r.Partition, r.Offset)
if pw.config.producesTombstones && r.Value != nil {
pw.validOffsets.SetLastConsumableOffset(r.Partition, r.Offset)
}
if pw.validateLatestValues {
pw.latestValueProduced.Insert(r.Partition, string(r.Key), string(r.Value))
if pw.validateLatestValues || pw.config.producesTombstones {
pw.latestValueProduced.InsertKeyValue(r.Partition, string(r.Key), r.Value)
pw.latestValueProduced.InsertKeyOffset(r.Partition, string(r.Key), r.Offset)
}
}

Expand Down Expand Up @@ -248,6 +246,19 @@ func (self *ProducerWorkerStatus) OnFail() {
self.Fails += 1
}

func (pw *ProducerWorker) parseLastConsumableOffsets() {
for partition, keyOffset := range pw.latestValueProduced.latestKoByPartition {
for key, offset := range keyOffset {
value, exists := pw.latestValueProduced.GetValue(int32(partition), key)
if exists && value != nil {
if offset > pw.validOffsets.GetLastConsumableOffset(int32(partition)) {
pw.validOffsets.SetLastConsumableOffset(int32(partition), offset)
}
}
}
}
}

func (pw *ProducerWorker) Store() {
err := pw.validOffsets.Store()
util.Chk(err, "Error writing offset map: %v", err)
Expand Down Expand Up @@ -449,6 +460,7 @@ func (pw *ProducerWorker) produceInner(n int64) (int64, []BadOffset, error) {
client.Close()
log.Info("Closed client.")

pw.parseLastConsumableOffsets()
pw.produceCheckpoint()

if errored {
Expand Down
4 changes: 2 additions & 2 deletions pkg/worker/verifier/validator_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ func (cs *ValidatorStatus) ValidateRecord(r *kgo.Record, validRanges *TopicOffse
}

if cs.expectFullyCompacted {
latestValue, exists := latestValuesProduced.Get(r.Partition, string(r.Key))
if !exists || latestValue != string(r.Value) {
latestValue, exists := latestValuesProduced.GetValue(r.Partition, string(r.Key))
if !exists || string(latestValue) != string(r.Value) {
log.Panicf("Consumed value for key %s does not match the latest produced value in a compacted topic- did compaction for partition %s/%d occur betwen producing and consuming?", r.Key, r.Topic, r.Partition)
}
}
Expand Down

0 comments on commit 1b80376

Please sign in to comment.