Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dst: fix data verification logic #907

Merged
merged 2 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 56 additions & 48 deletions dst/dst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,21 +217,23 @@ func (w *writerHelper) write(ctx context.Context) (uint64, error) {
}

// testLogStore wraps a LogStore that the WAL uses to write records. The main
// purpose of this struct is to keep track of what data we can consider to be
// "committed". This is necessary because WAL writes are async, so a successful
// write doesn't necessarily mean that write is durable.
// purpose of this struct is to keep track of what data might be missing at the
// end of a test run. For example, if we fail to commit log entries due to an
// fs error, we expect this write to be lost even though no error was returned
// to the user (WAL writes are async). The DST can also set a flag on the log
// store to track writes that are successfully committed in special cases (e.g.
// when a hard shutdown copies the WAL's directory from under it).
type testLogStore struct {
internalWal.LogStore
// accOnNoError is set to true if the LogStore should accumulate logs even
// on successful commit.
accOnNoError atomic.Bool
// acc does not need to be protected by a mutex since StoreLogs is called
// synchronously.
acc int64checksum
}

func (s *testLogStore) StoreLogs(logs []types.LogEntry) error {
if err := s.LogStore.StoreLogs(logs); err != nil {
return err
}
// Successful commit.
func (s *testLogStore) accumulateLogs(logs []types.LogEntry) error {
for _, log := range logs {
walRec := &walpb.Record{}
if err := walRec.UnmarshalVT(log.Data); err != nil {
Expand Down Expand Up @@ -259,6 +261,25 @@ func (s *testLogStore) StoreLogs(logs []types.LogEntry) error {
return nil
}

// droppedLogsCallback is called by the WAL when logs are dropped on WAL close
// timeout. The timeout usually happens when there is a truncation error due to
// a hard shutdown, so nextTx is never updated.
func (s *testLogStore) droppedLogsCallback(logs []types.LogEntry) {
_ = s.accumulateLogs(logs)
}

func (s *testLogStore) StoreLogs(logs []types.LogEntry) error {
if err := s.LogStore.StoreLogs(logs); err != nil {
_ = s.accumulateLogs(logs)
return err
}
// Successful commit.
if s.accOnNoError.Load() {
return s.accumulateLogs(logs)
}
return nil
}

// canIgnoreError returns whether the given error can be ignored. Specifically,
// errors returned by operations when the database is closing are not a problem.
func canIgnoreError(err error) bool {
Expand All @@ -272,6 +293,7 @@ func newStore(
logger log.Logger,
objectStorage *frostdb.DefaultObjstoreBucket,
newLogStoreWrapper func(internalWal.LogStore) internalWal.LogStore,
droppedLogsCallback func([]types.LogEntry),
fakeTicker wal.Ticker,
) (*frostdb.ColumnStore, error) {
return frostdb.New(
Expand All @@ -291,7 +313,11 @@ func newStore(
// is set to be very large in order to trigger snapshots manually.
frostdb.WithSnapshotTriggerSize(math.MaxInt64),
frostdb.WithTestingOptions(
frostdb.WithTestingWalOptions(wal.WithTestingLogStoreWrapper(newLogStoreWrapper), wal.WithTestingLoopTicker(fakeTicker)),
frostdb.WithTestingWalOptions(
wal.WithTestingLogStoreWrapper(newLogStoreWrapper),
wal.WithTestingLoopTicker(fakeTicker),
wal.WithTestingCallbackWithDroppedLogsOnClose(droppedLogsCallback),
),
),
)
}
Expand All @@ -315,12 +341,18 @@ func newTestLogger(t testing.TB) log.Logger {

// TestDST runs deterministic simulation tests against FrostDB. For true
// determinism and reproducibility, this test needs to be run with
// POLARSIGNALS_RANDOM_SEED set, the modified go runtime found at
// github.com/asubiotto/go (for now), and GOOS=wasip1 GOARCH=wasm.
// GORANDSEED set, the modified go runtime found at github.com/polarsignals/go,
// and GOOS=wasip1 GOARCH=wasm.
func TestDST(t *testing.T) {
if os.Getenv(randomSeedKey) == "" {
t.Skipf("%s not set, skipping deterministic simulation tests", randomSeedKey)
}
if os.Getenv("GOOS") != "wasip1" || os.Getenv("GOARCH") != "wasm" {
t.Skipf(
"GOOS=%s != wasip1 GOARCH=%s != wasm, skipping deterministic simulation tests",
os.Getenv("GOOS"), os.Getenv("GOARCH"),
)
}

t.Log("Running DST using random seed:", os.Getenv(randomSeedKey))
logger := newTestLogger(t)
Expand All @@ -335,7 +367,7 @@ func TestDST(t *testing.T) {
storageDir, log.WithPrefix(logger, "storeID", storeID), objectStorage, func(logStore internalWal.LogStore) internalWal.LogStore {
logStoreWrapper.LogStore = logStore
return logStoreWrapper
}, walTicker,
}, logStoreWrapper.droppedLogsCallback, walTicker,
)
require.NoError(t, err)

Expand Down Expand Up @@ -442,7 +474,7 @@ func TestDST(t *testing.T) {
log.WithPrefix(logger, "storeID", storeID), objectStorage, func(logStore internalWal.LogStore) internalWal.LogStore {
logStoreWrapper.LogStore = logStore
return logStoreWrapper
}, walTicker,
}, logStoreWrapper.droppedLogsCallback, walTicker,
)
require.NoError(t, err)
newDB, err := c.DB(ctx, dbName)
Expand Down Expand Up @@ -526,45 +558,21 @@ func TestDST(t *testing.T) {
)
}

// Missing rows. Double check it's not due to dropping WAL entries
// (which is valid). If the missing writes were written to the WAL then
// this is real data loss.
// TODO(asubiotto): The verification code is a little brittle. It is
// possible that not all rows written are reflected in a read if a
// restart happened with WAL entries pending. We attempt to verify by
// subsequently comparing the number of rows read to number of rows
// committed to WAL.
// However, for some reason we seem to commit fewer entries than are
// read.
// The ColumnStore.Close call below should make this impossible since
// it should drain any pending WAL entries (and there are no writes in
// flight). This requires some more investigation and thought. For now,
// this check is fine.
// One option is to make sure no missing writes (i.e. any remaining
// timestamps in expectedTimestamps) are contained in
// logStoreWrapper.acc.timestamps. I'm worried this would hide real
// data loss since the rows written to the log store seem to be much
// less than expected.

// Drain currently pending WAL entries by closing.
require.NoError(t, c.Close())

require.Equal(
t,
logStoreWrapper.acc.count,
timestampSum.count,
"number of rows mismatch, wrote %d, committed %d to WAL, and read %d\n"+
"timestamps that were expected and not found: %v\n"+
"timestamps that were encountered more than once: %v",
w.timestampSum.count, logStoreWrapper.acc.count, timestampSum.count,
expectedTimestamps, nonUnique,
)
}
if w.timestampSum.sum != timestampSum.sum {
require.Equal(
// Delete the timestamps that were not committed over the lifetime of
// the test. These were lost.
for _, v := range logStoreWrapper.acc.timestamps {
delete(expectedTimestamps, v)
}

require.Zero(
t,
logStoreWrapper.acc.sum,
timestampSum.sum,
len(expectedTimestamps),
"number of rows mismatch, wrote %d, and read %d. %d could not be committed to WAL\n"+
"timestamps that were expected and not found: %v\n",
w.timestampSum.count, timestampSum.count, logStoreWrapper.acc.count, expectedTimestamps,
)
}
}
5 changes: 3 additions & 2 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,12 @@ func (t *Table) writeBlock(
if rbo.wg != nil {
defer rbo.wg.Done()
}
level.Debug(t.logger).Log("msg", "syncing block", "ulid", block.ulid, "size", block.index.Size())
level.Debug(t.logger).Log("msg", "syncing block", "next_txn", nextTxn, "ulid", block.ulid, "size", block.index.Size())
block.pendingWritersWg.Wait()

// from now on, the block will no longer be modified, we can persist it to disk

level.Debug(t.logger).Log("msg", "done syncing block", "ulid", block.ulid, "size", block.index.Size())
level.Debug(t.logger).Log("msg", "done syncing block", "next_txn", nextTxn, "ulid", block.ulid, "size", block.index.Size())

// Persist the block
var err error
Expand All @@ -473,6 +473,7 @@ func (t *Table) writeBlock(
return err
}

level.Debug(t.logger).Log("msg", "recording block persistence in WAL", "ulid", block.ulid, "txn", tx)
if err := t.wal.Log(tx, &walpb.Record{
Entry: &walpb.Entry{
EntryType: &walpb.Entry_TableBlockPersisted_{
Expand Down
18 changes: 18 additions & 0 deletions wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ type FileWAL struct {

newLogStoreWrapper func(wal.LogStore) wal.LogStore
ticker Ticker
testingDroppedLogs func([]types.LogEntry)
}

type logRequest struct {
Expand Down Expand Up @@ -217,6 +218,14 @@ func WithTestingLoopTicker(t Ticker) Option {
}
}

// WithTestingCallbackWithDroppedLogsOnClose is called when the WAL times out on
// close with all the entries that could not be written.
func WithTestingCallbackWithDroppedLogsOnClose(cb func([]types.LogEntry)) Option {
return func(w *FileWAL) {
w.testingDroppedLogs = cb
}
}

func Open(
logger log.Logger,
path string,
Expand Down Expand Up @@ -305,6 +314,15 @@ func (w *FileWAL) run(ctx context.Context) {
level.Error(w.logger).Log(
"msg", "WAL timed out attempting to close",
)
if w.testingDroppedLogs != nil {
batch := make([]types.LogEntry, 0, n)
w.protected.Lock()
for _, v := range w.protected.queue {
batch = append(batch, types.LogEntry{Index: v.tx, Data: v.data})
}
w.protected.Unlock()
w.testingDroppedLogs(batch)
}
return
}

Expand Down
Loading