Skip to content

Commit

Permalink
db: fix size adjustment for in-progress compactions
Browse files Browse the repository at this point in the history
In #2436 the semantics of d.mu.compact.inProgress changed, and it's now
possible for that slice to hold compactions whose version edit has already been
applied. During compaction picking, these compactions should not adjust level
sizes because the current LSM already reflects the resulting LSM state.
  • Loading branch information
jbowens committed Jun 10, 2023
1 parent fe84618 commit 898fb2f
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 7 deletions.
6 changes: 6 additions & 0 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,12 @@ func calculateSizeAdjust(inProgressCompactions []compactionInfo) [numLevels]int6
var sizeAdjust [numLevels]int64
for i := range inProgressCompactions {
c := &inProgressCompactions[i]
// If this compaction's version edit has already been applied, there's
// no need to adjust: The LSM we'll examine will already reflect the
// new LSM state.
if c.versionEditApplied {
continue
}

for _, input := range c.inputs {
real := int64(input.files.SizeSum())
Expand Down
87 changes: 83 additions & 4 deletions compaction_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -1366,9 +1367,41 @@ func TestCompactionPickerPickFile(t *testing.T) {
})
}

type pausableCleaner struct {
mu sync.Mutex
cond sync.Cond
paused bool
cleaner Cleaner
}

func (c *pausableCleaner) Clean(fs vfs.FS, fileType base.FileType, path string) error {
c.mu.Lock()
defer c.mu.Unlock()
for c.paused {
c.cond.Wait()
}
return c.cleaner.Clean(fs, fileType, path)
}

func (c *pausableCleaner) pause() {
c.mu.Lock()
defer c.mu.Unlock()
c.paused = true
}

func (c *pausableCleaner) resume() {
c.mu.Lock()
defer c.mu.Unlock()
c.paused = false
c.cond.Broadcast()
}

func TestCompactionPickerScores(t *testing.T) {
fs := vfs.NewMem()
cleaner := pausableCleaner{cleaner: DeleteCleaner{}}
cleaner.cond.L = &cleaner.mu
opts := &Options{
Cleaner: &cleaner,
Comparer: testkeys.Comparer,
DisableAutomaticCompactions: true,
FormatMajorVersion: FormatNewest,
Expand All @@ -1379,6 +1412,7 @@ func TestCompactionPickerScores(t *testing.T) {
require.NoError(t, err)
defer func() {
if d != nil {
cleaner.resume()
require.NoError(t, closeAllSnapshots(d))
require.NoError(t, d.Close())
}
Expand All @@ -1391,6 +1425,10 @@ func TestCompactionPickerScores(t *testing.T) {
require.NoError(t, closeAllSnapshots(d))
require.NoError(t, d.Close())

if td.HasArg("pause-cleaning") {
cleaner.pause()
}

d, err = runDBDefineCmd(td, opts)
if err != nil {
return err.Error()
Expand All @@ -1413,6 +1451,10 @@ func TestCompactionPickerScores(t *testing.T) {
d.mu.Unlock()
return ""

case "resume-cleaning":
cleaner.resume()
return ""

case "ingest":
if err = runBuildCmd(td, d, d.opts.FS); err != nil {
return err.Error()
Expand All @@ -1428,15 +1470,52 @@ func TestCompactionPickerScores(t *testing.T) {
case "lsm":
return runLSMCmd(td, d)

case "maybe-compact":
buf.Reset()
d.mu.Lock()
d.maybeScheduleCompaction()
fmt.Fprintf(&buf, "%d compactions in progress:", d.mu.compact.compactingCount)
for c := range d.mu.compact.inProgress {
fmt.Fprintf(&buf, "\n%s", c)
}
d.mu.Unlock()
return buf.String()

case "scores":
waitFor := "completion"
td.MaybeScanArgs(t, "wait-for-compaction", &waitFor)

// Wait for any running compactions to complete before calculating
// scores. Otherwise, the output of this command is
// nondeterministic.
d.mu.Lock()
for d.mu.compact.compactingCount > 0 {
d.mu.compact.cond.Wait()
switch waitFor {
case "completion":
d.mu.Lock()
for d.mu.compact.compactingCount > 0 {
d.mu.compact.cond.Wait()
}
d.mu.Unlock()
case "version-edit":
func() {
for {
d.mu.Lock()
wait := len(d.mu.compact.inProgress) > 0
for c := range d.mu.compact.inProgress {
wait = wait && !c.versionEditApplied
}
d.mu.Unlock()
if !wait {
return
}
// d.mu.compact.cond isn't notified until the compaction
// is removed from inProgress, so we need to just sleep
// and check again soon.
time.Sleep(10 * time.Millisecond)
}
}()
default:
panic(fmt.Sprintf("unrecognized `wait-for-compaction` value: %q", waitFor))
}
d.mu.Unlock()

buf.Reset()
fmt.Fprintf(&buf, "L Size Score\n")
Expand Down
8 changes: 5 additions & 3 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,9 @@ func runForceIngestCmd(td *datadriven.TestData, d *DB) error {

func runLSMCmd(td *datadriven.TestData, d *DB) string {
d.mu.Lock()
s := d.mu.versions.currentVersion().String()
d.mu.Unlock()
return s
defer d.mu.Unlock()
if td.HasArg("verbose") {
return d.mu.versions.currentVersion().DebugString(d.opts.Comparer.FormatKey)
}
return d.mu.versions.currentVersion().String()
}
93 changes: 93 additions & 0 deletions testdata/compaction_picker_scores
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,96 @@ lsm
----
6:
000006:[a#0,SET-e#0,SET]

# Test the adjustment of level sizes to accommodate in-progress compactions. A
# compaction may be "inProgress" if it's already been applied, but is still
# deleting obsolete files. These compactions' effects have already been applied
# to the LSM, so size adjustment should ignore them and not doubly adjust sizes.

define lbase-max-bytes=65536 enable-table-stats=false auto-compactions=on pause-cleaning
L5
aa.SET.2:<rand-bytes=131072>
bb.SET.2:<rand-bytes=131072>
cc.SET.2:<rand-bytes=131072>
dd.SET.2:<rand-bytes=131072>
L5
e.SET.2:<rand-bytes=131072>
L6
a.SET.1:<rand-bytes=65536>
b.SET.1:<rand-bytes=65536>
c.SET.1:<rand-bytes=65536>
d.SET.1:<rand-bytes=65536>
L6
e.SET.1:<rand-bytes=131072>
----
5:
000004:[aa#2,SET-dd#2,SET]
000005:[e#2,SET-e#2,SET]
6:
000006:[a#1,SET-d#1,SET]
000007:[e#1,SET-e#1,SET]

scores
----
L Size Score
L0 0 B 0.0
L1 0 B 0.0
L2 0 B 0.0
L3 0 B 0.0
L4 0 B 0.0
L5 641 K 6.3
L6 385 K -

lsm verbose
----
5:
000004:[aa#2,SET-dd#2,SET] points:[aa#2,SET-dd#2,SET]
000005:[e#2,SET-e#2,SET] points:[e#2,SET-e#2,SET]
6:
000006:[a#1,SET-d#1,SET] points:[a#1,SET-d#1,SET]
000007:[e#1,SET-e#1,SET] points:[e#1,SET-e#1,SET]

# Attempting to schedule a compaction should begin a L5->L6 compaction.

maybe-compact
----
1 compactions in progress:
5: 000004:aa#2,1-dd#2,1
6: 000006:a#1,1-d#1,1

# The scores and sizes should be stable between when the version edit has been
# applied but the compaction has not completed, and when the compaction is
# finally complete.

scores wait-for-compaction=version-edit
----
L Size Score
L0 0 B 0.0
L1 0 B 0.0
L2 0 B 0.0
L3 0 B 0.0
L4 0 B 0.0
L5 129 K 0.5
L6 898 K -

lsm
----
5:
000005:[e#2,SET-e#2,SET]
6:
000008:[a#0,SET-dd#0,SET]
000007:[e#1,SET-e#1,SET]

resume-cleaning
----

scores wait-for-compaction=completion
----
L Size Score
L0 0 B 0.0
L1 0 B 0.0
L2 0 B 0.0
L3 0 B 0.0
L4 0 B 0.0
L5 129 K 0.5
L6 898 K -

0 comments on commit 898fb2f

Please sign in to comment.