Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
michae2 committed Dec 20, 2024
1 parent ff0001d commit 939f5ff
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions pkg/sql/row/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,18 +398,18 @@ func (ru *Updater) UpdateRow(
oldEntry, newEntry := &oldEntries[oldIdx], &newEntries[newIdx]
if oldEntry.Family == newEntry.Family {
// If the families are equal, then check if the keys have changed. If so, delete the old key.
// Then, issue a CPut for the new value of the key if the value has changed.
// Then, issue a CPut for the new key or a Put if only the value has changed.
// Because the indexes will always have a k/v for family 0, it suffices to only
// add foreign key checks in this case, because we are guaranteed to enter here.
oldIdx++
newIdx++
var expValue []byte
var sameKey bool
if !bytes.Equal(oldEntry.Key, newEntry.Key) {
if err := ru.Helper.deleteIndexEntry(ctx, batch, index, ru.Helper.secIndexValDirs[i], oldEntry, traceKV); err != nil {
return nil, err
}
} else if !newEntry.Value.EqualTagAndData(oldEntry.Value) {
expValue = oldEntry.Value.TagAndDataBytes()
sameKey = true
} else if !index.IsTemporaryIndexForBackfill() {
// If this is a temporary index for backfill, we want to make sure we write out all
// index values even in the case where they should be the same. We do this because the
Expand All @@ -428,13 +428,17 @@ func (ru *Updater) UpdateRow(
if traceKV {
k := keys.PrettyPrint(ru.Helper.secIndexValDirs[i], newEntry.Key)
v := newEntry.Value.PrettyPrint()
if expValue != nil {
log.VEventf(ctx, 2, "CPut %s -> %v (replacing %v, if exists)", k, v, oldEntry.Value.PrettyPrint())
if sameKey {
log.VEventf(ctx, 2, "Put %s -> %v", k, v)
} else {
log.VEventf(ctx, 2, "CPut %s -> %v (expecting does not exist)", k, v)
}
}
batch.CPutAllowingIfNotExists(newEntry.Key, &newEntry.Value, expValue)
if sameKey {
batch.Put(newEntry.Key, &newEntry.Value)
} else {
batch.CPutAllowingIfNotExists(newEntry.Key, &newEntry.Value, nil /* expValue */)
}
}
writtenIndexes.Add(i)
} else if oldEntry.Family < newEntry.Family {
Expand Down

0 comments on commit 939f5ff

Please sign in to comment.