Skip to content

Commit

Permalink
dst: fix data verification logic
Browse files Browse the repository at this point in the history
This commit updates the verification logic to expect all successfully written
timestamps except when there was an error writing a value to the WAL.
Previously, the fake log store was accumulating correctly written values but
this wasn't much use so it has been changed to accumulate values that could not
be written to the WAL and removing those from the expected values.
  • Loading branch information
asubiotto committed Jun 20, 2024
1 parent a5af5df commit 3fdf999
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 48 deletions.
104 changes: 56 additions & 48 deletions dst/dst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,21 +217,23 @@ func (w *writerHelper) write(ctx context.Context) (uint64, error) {
}

// testLogStore wraps a LogStore that the WAL uses to write records. The main
// purpose of this struct is to keep track of what data we can consider to be
// "committed". This is necessary because WAL writes are async, so a successful
// write doesn't necessarily mean that write is durable.
// purpose of this struct is to keep track of what data might be missing at the
// end of a test run. For example, if we fail to commit log entries due to an
// fs error, we expect this write to be lost even though no error was returned
// to the user (WAL writes are async). The DST can also set a flag on the log
// store to track writes that are successfully committed in special cases (e.g.
// when a hard shutdown copies the WAL's directory from under it).
type testLogStore struct {
internalWal.LogStore
// accOnNoError is set to true if the LogStore should accumulate logs even
// on successful commit.
accOnNoError atomic.Bool
// acc does not need to be protected by a mutex since StoreLogs is called
// synchronously.
acc int64checksum
}

func (s *testLogStore) StoreLogs(logs []types.LogEntry) error {
if err := s.LogStore.StoreLogs(logs); err != nil {
return err
}
// Successful commit.
func (s *testLogStore) accumulateLogs(logs []types.LogEntry) error {
for _, log := range logs {
walRec := &walpb.Record{}
if err := walRec.UnmarshalVT(log.Data); err != nil {
Expand Down Expand Up @@ -259,6 +261,25 @@ func (s *testLogStore) StoreLogs(logs []types.LogEntry) error {
return nil
}

// droppedLogsCallback is called by the WAL when logs are dropped on WAL close
// timeout. The timeout usually happens when there is a truncation error due to
// a hard shutdown, so nextTx is never updated.
func (s *testLogStore) droppedLogsCallback(logs []types.LogEntry) {
_ = s.accumulateLogs(logs)
}

func (s *testLogStore) StoreLogs(logs []types.LogEntry) error {
if err := s.LogStore.StoreLogs(logs); err != nil {
_ = s.accumulateLogs(logs)
return err
}
// Successful commit.
if s.accOnNoError.Load() {
return s.accumulateLogs(logs)
}
return nil
}

// canIgnoreError returns whether the given error can be ignored. Specifically,
// errors returned by operations when the database is closing are not a problem.
func canIgnoreError(err error) bool {
Expand All @@ -272,6 +293,7 @@ func newStore(
logger log.Logger,
objectStorage *frostdb.DefaultObjstoreBucket,
newLogStoreWrapper func(internalWal.LogStore) internalWal.LogStore,
droppedLogsCallback func([]types.LogEntry),
fakeTicker wal.Ticker,
) (*frostdb.ColumnStore, error) {
return frostdb.New(
Expand All @@ -291,7 +313,11 @@ func newStore(
// is set to be very large in order to trigger snapshots manually.
frostdb.WithSnapshotTriggerSize(math.MaxInt64),
frostdb.WithTestingOptions(
frostdb.WithTestingWalOptions(wal.WithTestingLogStoreWrapper(newLogStoreWrapper), wal.WithTestingLoopTicker(fakeTicker)),
frostdb.WithTestingWalOptions(
wal.WithTestingLogStoreWrapper(newLogStoreWrapper),
wal.WithTestingLoopTicker(fakeTicker),
wal.WithTestingCallbackWithDroppedLogsOnClose(droppedLogsCallback),
),
),
)
}
Expand All @@ -315,12 +341,18 @@ func newTestLogger(t testing.TB) log.Logger {

// TestDST runs deterministic simulation tests against FrostDB. For true
// determinism and reproducibility, this test needs to be run with
// POLARSIGNALS_RANDOM_SEED set, the modified go runtime found at
// github.com/asubiotto/go (for now), and GOOS=wasip1 GOARCH=wasm.
// GORANDSEED set, the modified go runtime found at github.com/polarsignals/go,
// and GOOS=wasip1 GOARCH=wasm.
func TestDST(t *testing.T) {
if os.Getenv(randomSeedKey) == "" {
t.Skipf("%s not set, skipping deterministic simulation tests", randomSeedKey)
}
if os.Getenv("GOOS") != "wasip1" || os.Getenv("GOARCH") != "wasm" {
t.Skipf(
"GOOS=%s != wasip1 GOARCH=%s != wasm, skipping deterministic simulation tests",
os.Getenv("GOOS"), os.Getenv("GOARCH"),
)
}

t.Log("Running DST using random seed:", os.Getenv(randomSeedKey))
logger := newTestLogger(t)
Expand All @@ -335,7 +367,7 @@ func TestDST(t *testing.T) {
storageDir, log.WithPrefix(logger, "storeID", storeID), objectStorage, func(logStore internalWal.LogStore) internalWal.LogStore {
logStoreWrapper.LogStore = logStore
return logStoreWrapper
}, walTicker,
}, logStoreWrapper.droppedLogsCallback, walTicker,
)
require.NoError(t, err)

Expand Down Expand Up @@ -442,7 +474,7 @@ func TestDST(t *testing.T) {
log.WithPrefix(logger, "storeID", storeID), objectStorage, func(logStore internalWal.LogStore) internalWal.LogStore {
logStoreWrapper.LogStore = logStore
return logStoreWrapper
}, walTicker,
}, logStoreWrapper.droppedLogsCallback, walTicker,
)
require.NoError(t, err)
newDB, err := c.DB(ctx, dbName)
Expand Down Expand Up @@ -526,45 +558,21 @@ func TestDST(t *testing.T) {
)
}

// Missing rows. Double check it's not due to dropping WAL entries
// (which is valid). If the missing writes were written to the WAL then
// this is real data loss.
// TODO(asubiotto): The verification code is a little brittle. It is
// possible that not all rows written are reflected in a read if a
// restart happened with WAL entries pending. We attempt to verify by
// subsequently comparing the number of rows read to number of rows
// committed to WAL.
// However, for some reason we seem to commit fewer entries than are
// read.
// The ColumnStore.Close call below should make this impossible since
// it should drain any pending WAL entries (and there are no writes in
// flight). This requires some more investigation and thought. For now,
// this check is fine.
// One option is to make sure no missing writes (i.e. any remaining
// timestamps in expectedTimestamps) are contained in
// logStoreWrapper.acc.timestamps. I'm worried this would hide real
// data loss since the rows written to the log store seem to be much
// less than expected.

// Drain currently pending WAL entries by closing.
require.NoError(t, c.Close())

require.Equal(
t,
logStoreWrapper.acc.count,
timestampSum.count,
"number of rows mismatch, wrote %d, committed %d to WAL, and read %d\n"+
"timestamps that were expected and not found: %v\n"+
"timestamps that were encountered more than once: %v",
w.timestampSum.count, logStoreWrapper.acc.count, timestampSum.count,
expectedTimestamps, nonUnique,
)
}
if w.timestampSum.sum != timestampSum.sum {
require.Equal(
// Delete the timestamps that were not committed over the lifetime of
// the test. These were lost.
for _, v := range logStoreWrapper.acc.timestamps {
delete(expectedTimestamps, v)
}

require.Zero(
t,
logStoreWrapper.acc.sum,
timestampSum.sum,
len(expectedTimestamps),
"number of rows mismatch, wrote %d, and read %d. %d could not be committed to WAL\n"+
"timestamps that were expected and not found: %v\n",
w.timestampSum.count, timestampSum.count, logStoreWrapper.acc.count, expectedTimestamps,
)
}
}
18 changes: 18 additions & 0 deletions wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ type FileWAL struct {

newLogStoreWrapper func(wal.LogStore) wal.LogStore
ticker Ticker
testingDroppedLogs func([]types.LogEntry)
}

type logRequest struct {
Expand Down Expand Up @@ -217,6 +218,14 @@ func WithTestingLoopTicker(t Ticker) Option {
}
}

// WithTestingCallbackWithDroppedLogsOnClose is called when the WAL times out on
// close with all the entries that could not be written.
func WithTestingCallbackWithDroppedLogsOnClose(cb func([]types.LogEntry)) Option {
return func(w *FileWAL) {
w.testingDroppedLogs = cb
}
}

func Open(
logger log.Logger,
path string,
Expand Down Expand Up @@ -305,6 +314,15 @@ func (w *FileWAL) run(ctx context.Context) {
level.Error(w.logger).Log(
"msg", "WAL timed out attempting to close",
)
if w.testingDroppedLogs != nil {
batch := make([]types.LogEntry, 0, n)
w.protected.Lock()
for _, v := range w.protected.queue {
batch = append(batch, types.LogEntry{Index: v.tx, Data: v.data})
}
w.protected.Unlock()
w.testingDroppedLogs(batch)
}
return
}

Expand Down

0 comments on commit 3fdf999

Please sign in to comment.