diff --git a/dst/dst_test.go b/dst/dst_test.go index 6096cdaac..7f0593016 100644 --- a/dst/dst_test.go +++ b/dst/dst_test.go @@ -217,21 +217,23 @@ func (w *writerHelper) write(ctx context.Context) (uint64, error) { } // testLogStore wraps a LogStore that the WAL uses to write records. The main -// purpose of this struct is to keep track of what data we can consider to be -// "committed". This is necessary because WAL writes are async, so a successful -// write doesn't necessarily mean that write is durable. +// purpose of this struct is to keep track of what data might be missing at the +// end of a test run. For example, if we fail to commit log entries due to an +// fs error, we expect this write to be lost even though no error was returned +// to the user (WAL writes are async). The DST can also set a flag on the log +// store to track writes that are successfully committed in special cases (e.g. +// when a hard shutdown copies the WAL's directory from under it). type testLogStore struct { internalWal.LogStore + // accOnNoError is set to true if the LogStore should accumulate logs even + // on successful commit. + accOnNoError atomic.Bool // acc does not need to be protected by a mutex since StoreLogs is called // synchronously. acc int64checksum } -func (s *testLogStore) StoreLogs(logs []types.LogEntry) error { - if err := s.LogStore.StoreLogs(logs); err != nil { - return err - } - // Successful commit. +func (s *testLogStore) accumulateLogs(logs []types.LogEntry) error { for _, log := range logs { walRec := &walpb.Record{} if err := walRec.UnmarshalVT(log.Data); err != nil { @@ -259,6 +261,25 @@ func (s *testLogStore) StoreLogs(logs []types.LogEntry) error { return nil } +// droppedLogsCallback is called by the WAL when logs are dropped on WAL close +// timeout. The timeout usually happens when there is a truncation error due to +// a hard shutdown, so nextTx is never updated. +func (s *testLogStore) droppedLogsCallback(logs []types.LogEntry) { + _ = s.accumulateLogs(logs) +} + +func (s *testLogStore) StoreLogs(logs []types.LogEntry) error { + if err := s.LogStore.StoreLogs(logs); err != nil { + _ = s.accumulateLogs(logs) + return err + } + // Successful commit. + if s.accOnNoError.Load() { + return s.accumulateLogs(logs) + } + return nil +} + // canIgnoreError returns whether the given error can be ignored. Specifically, // errors returned by operations when the database is closing are not a problem. func canIgnoreError(err error) bool { @@ -272,6 +293,7 @@ func newStore( logger log.Logger, objectStorage *frostdb.DefaultObjstoreBucket, newLogStoreWrapper func(internalWal.LogStore) internalWal.LogStore, + droppedLogsCallback func([]types.LogEntry), fakeTicker wal.Ticker, ) (*frostdb.ColumnStore, error) { return frostdb.New( @@ -291,7 +313,11 @@ func newStore( // is set to be very large in order to trigger snapshots manually. frostdb.WithSnapshotTriggerSize(math.MaxInt64), frostdb.WithTestingOptions( - frostdb.WithTestingWalOptions(wal.WithTestingLogStoreWrapper(newLogStoreWrapper), wal.WithTestingLoopTicker(fakeTicker)), + frostdb.WithTestingWalOptions( + wal.WithTestingLogStoreWrapper(newLogStoreWrapper), + wal.WithTestingLoopTicker(fakeTicker), + wal.WithTestingCallbackWithDroppedLogsOnClose(droppedLogsCallback), + ), ), ) } @@ -315,12 +341,18 @@ func newTestLogger(t testing.TB) log.Logger { // TestDST runs deterministic simulation tests against FrostDB. For true // determinism and reproducibility, this test needs to be run with -// POLARSIGNALS_RANDOM_SEED set, the modified go runtime found at -// github.com/asubiotto/go (for now), and GOOS=wasip1 GOARCH=wasm. +// GORANDSEED set, the modified go runtime found at github.com/polarsignals/go, +// and GOOS=wasip1 GOARCH=wasm. func TestDST(t *testing.T) { if os.Getenv(randomSeedKey) == "" { t.Skipf("%s not set, skipping deterministic simulation tests", randomSeedKey) } + if os.Getenv("GOOS") != "wasip1" || os.Getenv("GOARCH") != "wasm" { + t.Skipf( + "GOOS=%s != wasip1 GOARCH=%s != wasm, skipping deterministic simulation tests", + os.Getenv("GOOS"), os.Getenv("GOARCH"), + ) + } t.Log("Running DST using random seed:", os.Getenv(randomSeedKey)) logger := newTestLogger(t) @@ -335,7 +367,7 @@ func TestDST(t *testing.T) { storageDir, log.WithPrefix(logger, "storeID", storeID), objectStorage, func(logStore internalWal.LogStore) internalWal.LogStore { logStoreWrapper.LogStore = logStore return logStoreWrapper - }, walTicker, + }, logStoreWrapper.droppedLogsCallback, walTicker, ) require.NoError(t, err) @@ -442,7 +474,7 @@ func TestDST(t *testing.T) { log.WithPrefix(logger, "storeID", storeID), objectStorage, func(logStore internalWal.LogStore) internalWal.LogStore { logStoreWrapper.LogStore = logStore return logStoreWrapper - }, walTicker, + }, logStoreWrapper.droppedLogsCallback, walTicker, ) require.NoError(t, err) newDB, err := c.DB(ctx, dbName) @@ -526,45 +558,21 @@ func TestDST(t *testing.T) { ) } - // Missing rows. Double check it's not due to dropping WAL entries - // (which is valid). If the missing writes were written to the WAL then - // this is real data loss. - // TODO(asubiotto): The verification code is a little brittle. It is - // possible that not all rows written are reflected in a read if a - // restart happened with WAL entries pending. We attempt to verify by - // subsequently comparing the number of rows read to number of rows - // committed to WAL. - // However, for some reason we seem to commit fewer entries than are - // read. - // The ColumnStore.Close call below should make this impossible since - // it should drain any pending WAL entries (and there are no writes in - // flight). This requires some more investigation and thought. For now, - // this check is fine. - // One option is to make sure no missing writes (i.e. any remaining - // timestamps in expectedTimestamps) are contained in - // logStoreWrapper.acc.timestamps. I'm worried this would hide real - // data loss since the rows written to the log store seem to be much - // less than expected. - // Drain currently pending WAL entries by closing. require.NoError(t, c.Close()) - require.Equal( - t, - logStoreWrapper.acc.count, - timestampSum.count, - "number of rows mismatch, wrote %d, committed %d to WAL, and read %d\n"+ - "timestamps that were expected and not found: %v\n"+ - "timestamps that were encountered more than once: %v", - w.timestampSum.count, logStoreWrapper.acc.count, timestampSum.count, - expectedTimestamps, nonUnique, - ) - } - if w.timestampSum.sum != timestampSum.sum { - require.Equal( + // Delete the timestamps that were not committed over the lifetime of + // the test. These were lost. + for _, v := range logStoreWrapper.acc.timestamps { + delete(expectedTimestamps, v) + } + + require.Zero( t, - logStoreWrapper.acc.sum, - timestampSum.sum, + len(expectedTimestamps), + "number of rows mismatch, wrote %d, and read %d. %d could not be committed to WAL\n"+ + "timestamps that were expected and not found: %v\n", + w.timestampSum.count, timestampSum.count, logStoreWrapper.acc.count, expectedTimestamps, ) } } diff --git a/table.go b/table.go index 6d76e60f9..7ac9209b0 100644 --- a/table.go +++ b/table.go @@ -444,12 +444,12 @@ func (t *Table) writeBlock( if rbo.wg != nil { defer rbo.wg.Done() } - level.Debug(t.logger).Log("msg", "syncing block", "ulid", block.ulid, "size", block.index.Size()) + level.Debug(t.logger).Log("msg", "syncing block", "next_txn", nextTxn, "ulid", block.ulid, "size", block.index.Size()) block.pendingWritersWg.Wait() // from now on, the block will no longer be modified, we can persist it to disk - level.Debug(t.logger).Log("msg", "done syncing block", "ulid", block.ulid, "size", block.index.Size()) + level.Debug(t.logger).Log("msg", "done syncing block", "next_txn", nextTxn, "ulid", block.ulid, "size", block.index.Size()) // Persist the block var err error @@ -473,6 +473,7 @@ func (t *Table) writeBlock( return err } + level.Debug(t.logger).Log("msg", "recording block persistence in WAL", "ulid", block.ulid, "txn", tx) if err := t.wal.Log(tx, &walpb.Record{ Entry: &walpb.Entry{ EntryType: &walpb.Entry_TableBlockPersisted_{ diff --git a/wal/wal.go b/wal/wal.go index 5eb42fcbd..600d82a97 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -143,6 +143,7 @@ type FileWAL struct { newLogStoreWrapper func(wal.LogStore) wal.LogStore ticker Ticker + testingDroppedLogs func([]types.LogEntry) } type logRequest struct { @@ -217,6 +218,14 @@ func WithTestingLoopTicker(t Ticker) Option { } } +// WithTestingCallbackWithDroppedLogsOnClose is called when the WAL times out on +// close with all the entries that could not be written. +func WithTestingCallbackWithDroppedLogsOnClose(cb func([]types.LogEntry)) Option { + return func(w *FileWAL) { + w.testingDroppedLogs = cb + } +} + func Open( logger log.Logger, path string, @@ -305,6 +314,15 @@ func (w *FileWAL) run(ctx context.Context) { level.Error(w.logger).Log( "msg", "WAL timed out attempting to close", ) + if w.testingDroppedLogs != nil { + batch := make([]types.LogEntry, 0, n) + w.protected.Lock() + for _, v := range w.protected.queue { + batch = append(batch, types.LogEntry{Index: v.tx, Data: v.data}) + } + w.protected.Unlock() + w.testingDroppedLogs(batch) + } return }