Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dst: add hardRestart command #895

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 97 additions & 53 deletions dst/dst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ const (
compact
snapshot
rotate
restart
gracefulRestart
hardRestart
)

func (c command) String() string {
Expand All @@ -65,23 +66,26 @@ func (c command) String() string {
return "snapshot"
case rotate:
return "rotate"
case restart:
return "restart"
case gracefulRestart:
return "gracefulRestart"
case hardRestart:
return "hardRestart"
default:
return "<unknown>"
}
}

var commands = []command{insert, compact, snapshot, rotate, restart}
var commands = []command{insert, compact, snapshot, rotate, gracefulRestart, hardRestart}

// probabilities are command probabilities. It is not strictly necessary that
// these sum to 1.
var probabilities = map[command]float64{
insert: 0.75,
compact: 0.25,
snapshot: 0.1,
rotate: 0.05,
restart: 0.01,
insert: 0.75,
compact: 0.25,
snapshot: 0.1,
rotate: 0.05,
gracefulRestart: 0.01,
hardRestart: 0.01,
}

var cumulativeProbabilities []float64
Expand Down Expand Up @@ -163,6 +167,12 @@ type fakeTicker struct {
}

func (t *fakeTicker) C() <-chan time.Time {
// Fake sleep to simulate preemption. This avoids hot loops in the WAL that
// can starve other goroutines from running since asyncpreemption is not
// implemented in WASM.
if rand.Float64() < 0.2 {
time.Sleep(1 * time.Millisecond)
}
return t.c
}

Expand Down Expand Up @@ -339,12 +349,6 @@ func newTestLogger(t testing.TB) log.Logger {
return logger
}

// Remove unused warnings for now.
var (
_ = vfsShutdown
_ = vfsRestart
)

// TestDST runs deterministic simulation tests against FrostDB. For true
// determinism and reproducibility, this test needs to be run with
// GORANDSEED set, the modified go runtime found at github.com/polarsignals/go,
Expand Down Expand Up @@ -400,11 +404,56 @@ func TestDST(t *testing.T) {
return nil
}

errg := &errgroup.Group{}
errg.SetLimit(32)
newErrg := func() *errgroup.Group {
errg := &errgroup.Group{}
errg.SetLimit(32)
return errg
}

errg := newErrg()
commandDistribution := make(map[command]int)

ignoreGoroutinesAtStartOfTest := goleak.IgnoreCurrent()

waitForRunningGoroutines := func(t *testing.T) {
t.Helper()
// Unfortunately frostdb doesn't have goroutine lifecycle management
// and adding it could lead to subtle issues (e.g. on Close with
// many DBs). Instead, this test simply verifies all goroutines
// spawned up until this restart eventually exit after n retries.
const maxRetries = 10
for i := 0; i < maxRetries; i++ {
if err := goleak.Find(ignoreGoroutinesAtStartOfTest); err == nil {
break
} else if i == maxRetries-1 {
t.Fatalf("leaked goroutines found on Close: %v", err)
} else {
time.Sleep(1 * time.Millisecond)
}
}
}

restart := func(t *testing.T) {
t.Helper()
storeID++
c, err = newStore(
storageDir,
log.WithPrefix(logger, "storeID", storeID), objectStorage, func(logStore internalWal.LogStore) internalWal.LogStore {
logStoreWrapper.LogStore = logStore
return logStoreWrapper
}, logStoreWrapper.droppedLogsCallback, walTicker,
)
require.NoError(t, err)
newDB, err := c.DB(ctx, dbName)
require.NoError(t, err)
table, err := newDB.Table(tableName, tableConfig)
require.NoError(t, err)
db.Store(newDB)
tp.Update(table)
_, err = w.write(ctx)
// This write should succeed.
require.NoError(t, err)
}
for i := 0; i < numCommands; i++ {
cmd := genCommand()
commandDistribution[cmd]++
Expand Down Expand Up @@ -444,48 +493,40 @@ func TestDST(t *testing.T) {
}
return nil
})
case restart:
case gracefulRestart:
// This is a hack to ensure some goroutines are scheduled before
// this restart.
// this gracefulRestart.
// TODO(asubiotto): Figure out if we still need this.
time.Sleep(1 * time.Millisecond)
// Graceful shutdown.
require.NoError(t, c.Close())
_ = errg.Wait()

// Unfortunately frostdb doesn't have goroutine lifecycle management
// and adding it could lead to subtle issues (e.g. on Close with
// many DBs). Instead, this test simply verifies all goroutines
// spawned up until this restart eventually exit after n retries.
const maxRetries = 10
for i := 0; i < maxRetries; i++ {
if err := goleak.Find(ignoreGoroutinesAtStartOfTest); err == nil {
break
} else if i == maxRetries-1 {
t.Fatalf("leaked goroutines found on Close: %v", err)
} else {
time.Sleep(1 * time.Millisecond)
}
}

storeID++
c, err = newStore(
storageDir,
log.WithPrefix(logger, "storeID", storeID), objectStorage, func(logStore internalWal.LogStore) internalWal.LogStore {
logStoreWrapper.LogStore = logStore
return logStoreWrapper
}, logStoreWrapper.droppedLogsCallback, walTicker,
)
require.NoError(t, err)
newDB, err := c.DB(ctx, dbName)
require.NoError(t, err)
table, err := newDB.Table(tableName, tableConfig)
require.NoError(t, err)
db.Store(newDB)
tp.Update(table)
_, err = w.write(ctx)
// This write should succeed.
require.NoError(t, err)
// Reset the errg, otherwise if an error was encountered, it will
// be returned on the next Wait call.
errg = newErrg()
waitForRunningGoroutines(t)
restart(t)
case hardRestart:
// Simulate a hard restart by shutting down the VFS.
t.Log("calling vfsShutdown in hardRestart")
vfsShutdown()
// Wait before closing to avoid writes writing to the closed
// channel below. The snapshot of the directory has already been
// taken, so all of this is just cleanup code.
_ = errg.Wait()
// Reset the errg, otherwise if an error was encountered, it will
// be returned on the next Wait call.
errg = newErrg()
// Close the WAL ticker chan to ensure draining of all WAL entries.
// This is done as a workaround so that pending WAL entries are
// fully written to the LogStore and recorded as "lost".
close(walTicker.c)
_ = c.Close()
waitForRunningGoroutines(t)
walTicker.c = make(chan time.Time, 1)
t.Log("calling vfsRestart in hardRestart")
vfsRestart()
restart(t)
}
}

Expand All @@ -502,6 +543,9 @@ func TestDST(t *testing.T) {

listFiles := func(dir string) string {
de, err := os.ReadDir(filepath.Join(storageDir, "databases", dbName, dir))
if errors.Is(err, os.ErrNotExist) {
return ""
}
require.NoError(t, err)
var files []string
for _, e := range de {
Expand Down
Loading