Skip to content

Commit

Permalink
db: add DeleteSized and corresponding compaction heuristics
Browse files Browse the repository at this point in the history
Add a new write operation and corresponding internal key kind for delete
operations that know the size of the value they're deleting. SSTable writers
aggregate these estimates into table properties that are then used to improve
compaction-picking heuristics. This is particularly useful in scenarios with
heterogeneous value sizes that are easily misestimated by existing heuristics
that rely on averages.

When a DELSIZED tombstone deletes a key, it's mutated to reflect that the
deletion has already occurred. If the size carried within the tombstone's value
was accurate, the tombstone's value is removed. If the size was not accurate,
the tombstone is mutated into an ordinary DEL tombstone which will be subject
to typical average-based heuristics.

Close #2340.
  • Loading branch information
jbowens committed May 23, 2023
1 parent 49ae545 commit 25a8e9b
Show file tree
Hide file tree
Showing 59 changed files with 3,116 additions and 527 deletions.
98 changes: 95 additions & 3 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,13 @@ type Batch struct {
// then it will only contain key kinds of IngestSST.
ingestedSSTBatch bool

// minimumFormatMajorVersion indicates the format major version required in
// order to commit this batch. If an operation requires a particular format
// major version, it ratchets the batch's minimumFormatMajorVersion. When
// the batch is committed, this is validated against the database's current
// format major version.
minimumFormatMajorVersion FormatMajorVersion

// Synchronous Apply uses the commit WaitGroup for both publishing the
// seqnum and waiting for the WAL fsync (if needed). Asynchronous
// ApplyNoSyncWait, which implies WriteOptions.Sync is true, uses the commit
Expand Down Expand Up @@ -413,6 +420,7 @@ func (b *Batch) refreshMemTableSize() {

b.countRangeDels = 0
b.countRangeKeys = 0
b.minimumFormatMajorVersion = 0
for r := b.Reader(); ; {
kind, key, value, ok := r.Next()
if !ok {
Expand All @@ -423,12 +431,22 @@ func (b *Batch) refreshMemTableSize() {
b.countRangeDels++
case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
b.countRangeKeys++
case InternalKeyKindDeleteSized:
if b.minimumFormatMajorVersion < ExperimentalFormatDeleteSized {
b.minimumFormatMajorVersion = ExperimentalFormatDeleteSized
}
case InternalKeyKindIngestSST:
if b.minimumFormatMajorVersion < FormatFlushableIngest {
b.minimumFormatMajorVersion = FormatFlushableIngest
}
// This key kind doesn't contribute to the memtable size.
continue
}
b.memTableSize += memTableEntrySize(len(key), len(value))
}
if b.countRangeKeys > 0 && b.minimumFormatMajorVersion < FormatRangeKeys {
b.minimumFormatMajorVersion = FormatRangeKeys
}
}

// Apply the operations contained in the batch to the receiver batch.
Expand Down Expand Up @@ -680,6 +698,72 @@ func (b *Batch) DeleteDeferred(keyLen int) *DeferredBatchOp {
return &b.deferredOp
}

// DeleteSized behaves identically to Delete, but takes an additional
// argument indicating the size of the value being deleted. DeleteSized
// should be preferred when the caller has the expectation that there exists
// a single internal KV pair for the key (eg, the key has not been
// overwritten recently), and the caller knows the size of its value.
//
// DeleteSized will record the value size within the tombstone and use it to
// inform compaction-picking heuristics which strive to reduce space
// amplification in the LSM. This "calling your shot" mechanic allows the
// storage engine to more accurately estimate and reduce space amplification.
//
// It is safe to modify the contents of the arguments after DeleteSized
// returns.
func (b *Batch) DeleteSized(key []byte, deletedValueSize uint32, _ *WriteOptions) error {
deferredOp := b.DeleteSizedDeferred(len(key), deletedValueSize)
copy(b.deferredOp.Key, key)
// TODO(peter): Manually inline DeferredBatchOp.Finish(). Check if in a
// later Go release this is unnecessary.
if b.index != nil {
if err := b.index.Add(deferredOp.offset); err != nil {
return err
}
}
return nil
}

// DeleteSizedDeferred is similar to DeleteSized in that it adds a sized delete
// operation to the batch, except it only takes in key length instead of a
// complete key slice, letting the caller encode into the DeferredBatchOp.Key
// slice and then call Finish() on the returned object.
func (b *Batch) DeleteSizedDeferred(keyLen int, deletedValueSize uint32) *DeferredBatchOp {
if b.minimumFormatMajorVersion < ExperimentalFormatDeleteSized {
b.minimumFormatMajorVersion = ExperimentalFormatDeleteSized
}

// Encode the sum of the key length and the value in the value.
v := uint64(deletedValueSize) + uint64(keyLen)

// Encode `v` as a varint.
var buf [binary.MaxVarintLen64]byte
n := 0
{
x := v
for x >= 0x80 {
buf[n] = byte(x) | 0x80
x >>= 7
n++
}
buf[n] = byte(x)
n++
}

// NB: In batch entries and sstable entries, values are stored as
// varstrings. Here, the value is itself a simple varint. This results in an
// unnecessary double layer of encoding:
// varint(n) varint(deletedValueSize)
// The first varint will always be 1-byte, since a varint-encoded uint64
// will never exceed 128 bytes. This unnecessary extra byte and wrapping is
// preserved to avoid special casing across the database, and in particular
// in sstable block decoding which is performance sensitive.
b.prepareDeferredKeyValueRecord(keyLen, n, InternalKeyKindDeleteSized)
b.deferredOp.index = b.index
copy(b.deferredOp.Value, buf[:n])
return &b.deferredOp
}

// SingleDelete adds an action to the batch that single deletes the entry for key.
// See Writer.SingleDelete for more details on the semantics of SingleDelete.
//
Expand Down Expand Up @@ -782,6 +866,9 @@ func (b *Batch) rangeKeySetDeferred(startLen, internalValueLen int) *DeferredBat

func (b *Batch) incrementRangeKeysCount() {
b.countRangeKeys++
if b.minimumFormatMajorVersion < FormatRangeKeys {
b.minimumFormatMajorVersion = FormatRangeKeys
}
if b.index != nil {
b.rangeKeys = nil
b.rangeKeysSeqNum = 0
Expand Down Expand Up @@ -895,6 +982,7 @@ func (b *Batch) ingestSST(fileNum base.FileNum) {
// is not reset because for the InternalKeyKindIngestSST the count is the
// number of sstable paths which have been added to the batch.
b.memTableSize = origMemTableSize
b.minimumFormatMajorVersion = FormatFlushableIngest
}

// Empty returns true if the batch is empty, and false otherwise.
Expand Down Expand Up @@ -1199,6 +1287,7 @@ func (b *Batch) Reset() {
b.commitStats = BatchCommitStats{}
b.commitErr = nil
b.applied.Store(false)
b.minimumFormatMajorVersion = 0
if b.data != nil {
if cap(b.data) > batchMaxRetainedSize {
// If the capacity of the buffer is larger than our maximum
Expand Down Expand Up @@ -1365,7 +1454,8 @@ func (r *BatchReader) Next() (kind InternalKeyKind, ukey []byte, value []byte, o
}
switch kind {
case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
InternalKeyKindDeleteSized:
*r, value, ok = batchDecodeStr(*r)
if !ok {
return 0, nil, nil, false
Expand Down Expand Up @@ -1500,7 +1590,8 @@ func (i *batchIter) value() []byte {

switch InternalKeyKind(data[offset]) {
case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
InternalKeyKindDeleteSized:
_, value, ok := batchDecodeStr(data[keyEnd:])
if !ok {
return nil
Expand Down Expand Up @@ -1944,7 +2035,8 @@ func (i *flushableBatchIter) value() base.LazyValue {
var ok bool
switch kind {
case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete,
InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
InternalKeyKindDeleteSized:
keyEnd := i.offsets[i.index].keyEnd
_, value, ok = batchDecodeStr(i.data[keyEnd:])
if !ok {
Expand Down
66 changes: 38 additions & 28 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestBatch(t *testing.T) {
type testCase struct {
kind InternalKeyKind
key, value string
valueInt uint32
}

verifyTestCases := func(b *Batch, testCases []testCase) {
Expand Down Expand Up @@ -68,31 +69,33 @@ func TestBatch(t *testing.T) {
// deferred variants. This is a consequence of these keys' more complex
// value encodings.
testCases := []testCase{
{InternalKeyKindIngestSST, encodeFileNum(1), ""},
{InternalKeyKindSet, "roses", "red"},
{InternalKeyKindSet, "violets", "blue"},
{InternalKeyKindDelete, "roses", ""},
{InternalKeyKindSingleDelete, "roses", ""},
{InternalKeyKindSet, "", ""},
{InternalKeyKindSet, "", "non-empty"},
{InternalKeyKindDelete, "", ""},
{InternalKeyKindSingleDelete, "", ""},
{InternalKeyKindSet, "grass", "green"},
{InternalKeyKindSet, "grass", "greener"},
{InternalKeyKindSet, "eleventy", strings.Repeat("!!11!", 100)},
{InternalKeyKindDelete, "nosuchkey", ""},
{InternalKeyKindSingleDelete, "nosuchkey", ""},
{InternalKeyKindSet, "binarydata", "\x00"},
{InternalKeyKindSet, "binarydata", "\xff"},
{InternalKeyKindMerge, "merge", "mergedata"},
{InternalKeyKindMerge, "merge", ""},
{InternalKeyKindMerge, "", ""},
{InternalKeyKindRangeDelete, "a", "b"},
{InternalKeyKindRangeDelete, "", ""},
{InternalKeyKindLogData, "logdata", ""},
{InternalKeyKindLogData, "", ""},
{InternalKeyKindRangeKeyDelete, "grass", "green"},
{InternalKeyKindRangeKeyDelete, "", ""},
{InternalKeyKindIngestSST, encodeFileNum(1), "", 0},
{InternalKeyKindSet, "roses", "red", 0},
{InternalKeyKindSet, "violets", "blue", 0},
{InternalKeyKindDelete, "roses", "", 0},
{InternalKeyKindSingleDelete, "roses", "", 0},
{InternalKeyKindSet, "", "", 0},
{InternalKeyKindSet, "", "non-empty", 0},
{InternalKeyKindDelete, "", "", 0},
{InternalKeyKindSingleDelete, "", "", 0},
{InternalKeyKindSet, "grass", "green", 0},
{InternalKeyKindSet, "grass", "greener", 0},
{InternalKeyKindSet, "eleventy", strings.Repeat("!!11!", 100), 0},
{InternalKeyKindDelete, "nosuchkey", "", 0},
{InternalKeyKindDeleteSized, "eleventy", string(binary.AppendUvarint([]byte(nil), 508)), 500},
{InternalKeyKindSingleDelete, "nosuchkey", "", 0},
{InternalKeyKindSet, "binarydata", "\x00", 0},
{InternalKeyKindSet, "binarydata", "\xff", 0},
{InternalKeyKindMerge, "merge", "mergedata", 0},
{InternalKeyKindMerge, "merge", "", 0},
{InternalKeyKindMerge, "", "", 0},
{InternalKeyKindRangeDelete, "a", "b", 0},
{InternalKeyKindRangeDelete, "", "", 0},
{InternalKeyKindLogData, "logdata", "", 0},
{InternalKeyKindLogData, "", "", 0},
{InternalKeyKindRangeKeyDelete, "grass", "green", 0},
{InternalKeyKindRangeKeyDelete, "", "", 0},
{InternalKeyKindDeleteSized, "nosuchkey", string(binary.AppendUvarint([]byte(nil), 11)), 2},
}
var b Batch
for _, tc := range testCases {
Expand All @@ -103,6 +106,8 @@ func TestBatch(t *testing.T) {
_ = b.Merge([]byte(tc.key), []byte(tc.value), nil)
case InternalKeyKindDelete:
_ = b.Delete([]byte(tc.key), nil)
case InternalKeyKindDeleteSized:
_ = b.DeleteSized([]byte(tc.key), tc.valueInt, nil)
case InternalKeyKindSingleDelete:
_ = b.SingleDelete([]byte(tc.key), nil)
case InternalKeyKindRangeDelete:
Expand Down Expand Up @@ -139,6 +144,10 @@ func TestBatch(t *testing.T) {
copy(d.Key, key)
copy(d.Value, value)
d.Finish()
case InternalKeyKindDeleteSized:
d := b.DeleteSizedDeferred(len(tc.key), tc.valueInt)
copy(d.Key, key)
d.Finish()
case InternalKeyKindSingleDelete:
d := b.SingleDeleteDeferred(len(key))
copy(d.Key, key)
Expand Down Expand Up @@ -334,6 +343,7 @@ func TestBatchReset(t *testing.T) {
require.Equal(t, batchHeaderLen, len(b.data))
require.Equal(t, uint64(0), b.SeqNum())
require.Equal(t, uint64(0), b.memTableSize)
require.Equal(t, FormatMajorVersion(0x00), b.minimumFormatMajorVersion)
require.Equal(t, b.deferredOp, DeferredBatchOp{})
_ = b.Repr()

Expand Down Expand Up @@ -441,7 +451,7 @@ func TestIndexedBatchMutation(t *testing.T) {
opts := &Options{
Comparer: testkeys.Comparer,
FS: vfs.NewMem(),
FormatMajorVersion: FormatNewest,
FormatMajorVersion: internalFormatNewest,
}
d, err := Open("", opts)
require.NoError(t, err)
Expand Down Expand Up @@ -540,7 +550,7 @@ func TestIndexedBatchMutation(t *testing.T) {
func TestIndexedBatch_GlobalVisibility(t *testing.T) {
opts := &Options{
FS: vfs.NewMem(),
FormatMajorVersion: FormatNewest,
FormatMajorVersion: internalFormatNewest,
Comparer: testkeys.Comparer,
}
d, err := Open("", opts)
Expand Down Expand Up @@ -1452,7 +1462,7 @@ func TestBatchSpanCaching(t *testing.T) {
opts := &Options{
Comparer: testkeys.Comparer,
FS: vfs.NewMem(),
FormatMajorVersion: FormatNewest,
FormatMajorVersion: internalFormatNewest,
}
d, err := Open("", opts)
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestCheckpoint(t *testing.T) {
var memLog base.InMemLogger
opts := &Options{
FS: vfs.WithLogging(mem, memLog.Infof),
FormatMajorVersion: FormatNewest,
FormatMajorVersion: internalFormatNewest,
L0CompactionThreshold: 10,
}

Expand Down Expand Up @@ -296,7 +296,7 @@ func TestCheckpointManyFiles(t *testing.T) {
const checkpointPath = "checkpoint"
opts := &Options{
FS: vfs.NewMem(),
FormatMajorVersion: FormatNewest,
FormatMajorVersion: internalFormatNewest,
DisableAutomaticCompactions: true,
}
// Disable compression to speed up the test.
Expand Down
Loading

0 comments on commit 25a8e9b

Please sign in to comment.