diff --git a/dst/dst_test.go b/dst/dst_test.go index b6209b608..32169937d 100644 --- a/dst/dst_test.go +++ b/dst/dst_test.go @@ -52,7 +52,8 @@ const ( compact snapshot rotate - restart + gracefulRestart + hardRestart ) func (c command) String() string { @@ -65,23 +66,26 @@ func (c command) String() string { return "snapshot" case rotate: return "rotate" - case restart: - return "restart" + case gracefulRestart: + return "gracefulRestart" + case hardRestart: + return "hardRestart" default: return "" } } -var commands = []command{insert, compact, snapshot, rotate, restart} +var commands = []command{insert, compact, snapshot, rotate, gracefulRestart, hardRestart} // probabilities are command probabilities. It is not strictly necessary that // these sum to 1. var probabilities = map[command]float64{ - insert: 0.75, - compact: 0.25, - snapshot: 0.1, - rotate: 0.05, - restart: 0.01, + insert: 0.75, + compact: 0.25, + snapshot: 0.1, + rotate: 0.05, + gracefulRestart: 0.01, + hardRestart: 0.01, } var cumulativeProbabilities []float64 @@ -163,6 +167,12 @@ type fakeTicker struct { } func (t *fakeTicker) C() <-chan time.Time { + // Fake sleep to simulate preemption. This avoids hot loops in the WAL that + // can starve other goroutines from running since asyncpreemption is not + // implemented in WASM. + if rand.Float64() < 0.2 { + time.Sleep(1 * time.Millisecond) + } return t.c } @@ -339,12 +349,6 @@ func newTestLogger(t testing.TB) log.Logger { return logger } -// Remove unused warnings for now. -var ( - _ = vfsShutdown - _ = vfsRestart -) - // TestDST runs deterministic simulation tests against FrostDB. For true // determinism and reproducibility, this test needs to be run with // GORANDSEED set, the modified go runtime found at github.com/polarsignals/go, @@ -400,11 +404,56 @@ func TestDST(t *testing.T) { return nil } - errg := &errgroup.Group{} - errg.SetLimit(32) + newErrg := func() *errgroup.Group { + errg := &errgroup.Group{} + errg.SetLimit(32) + return errg + } + + errg := newErrg() commandDistribution := make(map[command]int) ignoreGoroutinesAtStartOfTest := goleak.IgnoreCurrent() + + waitForRunningGoroutines := func(t *testing.T) { + t.Helper() + // Unfortunately frostdb doesn't have goroutine lifecycle management + // and adding it could lead to subtle issues (e.g. on Close with + // many DBs). Instead, this test simply verifies all goroutines + // spawned up until this restart eventually exit after n retries. + const maxRetries = 10 + for i := 0; i < maxRetries; i++ { + if err := goleak.Find(ignoreGoroutinesAtStartOfTest); err == nil { + break + } else if i == maxRetries-1 { + t.Fatalf("leaked goroutines found on Close: %v", err) + } else { + time.Sleep(1 * time.Millisecond) + } + } + } + + restart := func(t *testing.T) { + t.Helper() + storeID++ + c, err = newStore( + storageDir, + log.WithPrefix(logger, "storeID", storeID), objectStorage, func(logStore internalWal.LogStore) internalWal.LogStore { + logStoreWrapper.LogStore = logStore + return logStoreWrapper + }, logStoreWrapper.droppedLogsCallback, walTicker, + ) + require.NoError(t, err) + newDB, err := c.DB(ctx, dbName) + require.NoError(t, err) + table, err := newDB.Table(tableName, tableConfig) + require.NoError(t, err) + db.Store(newDB) + tp.Update(table) + _, err = w.write(ctx) + // This write should succeed. + require.NoError(t, err) + } for i := 0; i < numCommands; i++ { cmd := genCommand() commandDistribution[cmd]++ @@ -444,48 +493,40 @@ func TestDST(t *testing.T) { } return nil }) - case restart: + case gracefulRestart: // This is a hack to ensure some goroutines are scheduled before - // this restart. + // this gracefulRestart. // TODO(asubiotto): Figure out if we still need this. time.Sleep(1 * time.Millisecond) // Graceful shutdown. require.NoError(t, c.Close()) _ = errg.Wait() - - // Unfortunately frostdb doesn't have goroutine lifecycle management - // and adding it could lead to subtle issues (e.g. on Close with - // many DBs). Instead, this test simply verifies all goroutines - // spawned up until this restart eventually exit after n retries. - const maxRetries = 10 - for i := 0; i < maxRetries; i++ { - if err := goleak.Find(ignoreGoroutinesAtStartOfTest); err == nil { - break - } else if i == maxRetries-1 { - t.Fatalf("leaked goroutines found on Close: %v", err) - } else { - time.Sleep(1 * time.Millisecond) - } - } - - storeID++ - c, err = newStore( - storageDir, - log.WithPrefix(logger, "storeID", storeID), objectStorage, func(logStore internalWal.LogStore) internalWal.LogStore { - logStoreWrapper.LogStore = logStore - return logStoreWrapper - }, logStoreWrapper.droppedLogsCallback, walTicker, - ) - require.NoError(t, err) - newDB, err := c.DB(ctx, dbName) - require.NoError(t, err) - table, err := newDB.Table(tableName, tableConfig) - require.NoError(t, err) - db.Store(newDB) - tp.Update(table) - _, err = w.write(ctx) - // This write should succeed. - require.NoError(t, err) + // Reset the errg, otherwise if an error was encountered, it will + // be returned on the next Wait call. + errg = newErrg() + waitForRunningGoroutines(t) + restart(t) + case hardRestart: + // Simulate a hard restart by shutting down the VFS. + t.Log("calling vfsShutdown in hardRestart") + vfsShutdown() + // Wait before closing to avoid writes writing to the closed + // channel below. The snapshot of the directory has already been + // taken, so all of this is just cleanup code. + _ = errg.Wait() + // Reset the errg, otherwise if an error was encountered, it will + // be returned on the next Wait call. + errg = newErrg() + // Close the WAL ticker chan to ensure draining of all WAL entries. + // This is done as a workaround so that pending WAL entries are + // fully written to the LogStore and recorded as "lost". + close(walTicker.c) + _ = c.Close() + waitForRunningGoroutines(t) + walTicker.c = make(chan time.Time, 1) + t.Log("calling vfsRestart in hardRestart") + vfsRestart() + restart(t) } } @@ -502,6 +543,9 @@ func TestDST(t *testing.T) { listFiles := func(dir string) string { de, err := os.ReadDir(filepath.Join(storageDir, "databases", dbName, dir)) + if errors.Is(err, os.ErrNotExist) { + return "" + } require.NoError(t, err) var files []string for _, e := range de {