From cc5e9a81bfdbf2cd165c3cdb4f10a926cee8bcfa Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Wed, 17 May 2023 08:46:17 -0700 Subject: [PATCH] *: add IngestAndExcise operation This change adds an IngestAndExcise operation that does the below additional things alongside a regular ingestion: 1) It ingests some SharedSSTMeta files, which are provider-backed sstables that could be owned by other nodes. 2) It excises existing sstables within the provided excise span (within which all sstables from 1 must fit) by creating new virtual sstables that exclude keys from the excise span. While this change can be implemented independently of #2455, some of the end-to-end tests in future changes will rely on both that and this. Fixes #2520. --- compaction.go | 17 +- data_test.go | 28 +- flushable_test.go | 7 +- ingest.go | 600 +++++++++++++++++++++++++++--- ingest_test.go | 277 +++++++++++++- internal/manifest/l0_sublevels.go | 10 + internal/manifest/version.go | 20 +- open.go | 7 +- scan_internal.go | 8 +- table_cache.go | 26 ++ table_stats.go | 5 + testdata/excise | 165 ++++++++ version_set.go | 8 + 13 files changed, 1103 insertions(+), 75 deletions(-) create mode 100644 testdata/excise diff --git a/compaction.go b/compaction.go index 1c17d2423d..a7519b601f 100644 --- a/compaction.go +++ b/compaction.go @@ -2558,9 +2558,20 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) { info.Duration = d.timeNow().Sub(startTime) if err == nil { d.mu.versions.logLock() - err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo { - return d.getInProgressCompactionInfoLocked(c) - }) + // Confirm if any of this compaction's inputs were deleted while this + // compaction was ongoing. + for i := range c.inputs { + c.inputs[i].files.Each(func(m *manifest.FileMetadata) { + if m.Deleted { + err = firstError(err, errors.New("pebble: file deleted by a concurrent operation, will retry compaction")) + } + }) + } + if err == nil { + err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo { + return d.getInProgressCompactionInfoLocked(c) + }) + } if err != nil { // TODO(peter): untested. for _, f := range pendingOutputs { diff --git a/data_test.go b/data_test.go index 361b677b1c..873e682e04 100644 --- a/data_test.go +++ b/data_test.go @@ -1167,6 +1167,32 @@ func (d *DB) waitTableStats() { } } +func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { + var exciseSpan KeyRange + paths := make([]string, 0, len(td.CmdArgs)) + for i, arg := range td.CmdArgs { + switch td.CmdArgs[i].Key { + case "excise": + if len(td.CmdArgs[i].Vals) != 1 { + return errors.New("expected 2 values for excise separated by -, eg. ingest-and-excise foo1 excise=\"start-end\"") + } + fields := strings.Split(td.CmdArgs[i].Vals[0], "-") + if len(fields) != 2 { + return errors.New("expected 2 values for excise separated by -, eg. ingest-and-excise foo1 excise=\"start-end\"") + } + exciseSpan.Start = []byte(fields[0]) + exciseSpan.End = []byte(fields[1]) + default: + paths = append(paths, arg.String()) + } + } + + if _, err := d.IngestAndExcise(paths, nil /* shared */, exciseSpan); err != nil { + return err + } + return nil +} + func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { paths := make([]string, 0, len(td.CmdArgs)) for _, arg := range td.CmdArgs { @@ -1205,7 +1231,7 @@ func runForceIngestCmd(td *datadriven.TestData, d *DB) error { *fileMetadata, ) (int, error) { return level, nil - }) + }, nil, KeyRange{}) return err } diff --git a/flushable_test.go b/flushable_test.go index 7c5273da13..b16d59531c 100644 --- a/flushable_test.go +++ b/flushable_test.go @@ -58,9 +58,10 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { // We can reuse the ingestLoad function for this test even if we're // not actually ingesting a file. - meta, paths, err := ingestLoad( - d.opts, d.FormatMajorVersion(), paths, d.cacheID, pendingOutputs, + lr, err := ingestLoad( + d.opts, d.FormatMajorVersion(), paths, nil /* shared */, d.cacheID, pendingOutputs, d.objProvider, ) + meta := lr.localMeta if err != nil { panic(err) } @@ -70,7 +71,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { } // Verify the sstables do not overlap. - if err := ingestSortAndVerify(d.cmp, meta, paths); err != nil { + if err := ingestSortAndVerify(d.cmp, lr, KeyRange{}); err != nil { panic("unsorted sstables") } diff --git a/ingest.go b/ingest.go index 617c8b268f..47ea123441 100644 --- a/ingest.go +++ b/ingest.go @@ -6,6 +6,7 @@ package pebble import ( "context" + "fmt" "sort" "time" @@ -33,6 +34,9 @@ func sstableKeyCompare(userCmp Compare, a, b InternalKey) int { return 0 } +// KeyRange exports the KeyRange type. +type KeyRange = manifest.UserKeyRange + func ingestValidateKey(opts *Options, key *InternalKey) error { if key.Kind() == InternalKeyKindInvalid { return base.CorruptionErrorf("pebble: external sstable has corrupted key: %s", @@ -45,15 +49,67 @@ func ingestValidateKey(opts *Options, key *InternalKey) error { return nil } +// ingestLoad1Shared loads the fileMetadata for one shared sstable. +func ingestLoad1Shared( + opts *Options, sm SharedSSTMeta, fileNum base.DiskFileNum, size int64, +) (*fileMetadata, error) { + // Don't load table stats. Doing a round trip to shared storage, one SST + // at a time is not worth it as it slows down ingestion. + meta := &fileMetadata{} + meta.FileNum = fileNum.FileNum() + meta.CreationTime = time.Now().Unix() + meta.Virtual = true + meta.InitProviderBacking(fileNum, uint64(size)) + meta.Size = sm.Size + if sm.LargestRangeKey.Valid() && sm.LargestRangeKey.UserKey != nil { + meta.HasRangeKeys = true + meta.SmallestRangeKey = sm.SmallestRangeKey + meta.LargestRangeKey = sm.LargestRangeKey + switch sm.Level { + case 5: + meta.SmallestRangeKey.SetSeqNum(base.SeqNumL5RangeKeySet) + meta.LargestRangeKey.SetSeqNum(base.SeqNumL5RangeKeyUnsetDel) + case 6: + meta.SmallestRangeKey.SetSeqNum(base.SeqNumL6RangeKey) + meta.LargestRangeKey.SetSeqNum(base.SeqNumL6RangeKey) + default: + panic(fmt.Sprintf("unexpected level in shared meta: %d", sm.Level)) + } + meta.ExtendRangeKeyBounds(opts.Comparer.Compare, meta.SmallestRangeKey, meta.LargestRangeKey) + } + if sm.LargestPointKey.Valid() && sm.LargestPointKey.UserKey != nil { + meta.HasPointKeys = true + meta.SmallestPointKey = sm.SmallestPointKey + meta.LargestPointKey = sm.LargestPointKey + switch sm.Level { + case 5: + meta.SmallestPointKey.SetSeqNum(base.SeqNumL5Point) + meta.LargestPointKey.SetSeqNum(base.SeqNumL5RangeDel) + case 6: + meta.SmallestPointKey.SetSeqNum(base.SeqNumL6Point) + meta.LargestPointKey.SetSeqNum(base.SeqNumL6Point) + default: + panic(fmt.Sprintf("unexpected level in shared meta: %d", sm.Level)) + } + meta.ExtendPointKeyBounds(opts.Comparer.Compare, meta.SmallestPointKey, meta.LargestPointKey) + } + meta.SmallestSeqNum = meta.Smallest.SeqNum() + meta.LargestSeqNum = meta.Largest.SeqNum() + if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil { + return nil, err + } + return meta, nil +} + func ingestLoad1( opts *Options, fmv FormatMajorVersion, path string, cacheID uint64, fileNum base.DiskFileNum, ) (*fileMetadata, error) { + var readable objstorage.Readable f, err := opts.FS.Open(path) if err != nil { return nil, err } - - readable, err := sstable.NewSimpleReadable(f) + readable, err = sstable.NewSimpleReadable(f) if err != nil { return nil, err } @@ -195,22 +251,75 @@ func ingestLoad1( return meta, nil } +type ingestLoadResult struct { + localMeta, sharedMeta []*fileMetadata + localPaths []string + sharedLevels []uint8 +} + func ingestLoad( - opts *Options, fmv FormatMajorVersion, paths []string, cacheID uint64, pending []base.DiskFileNum, -) ([]*fileMetadata, []string, error) { + opts *Options, + fmv FormatMajorVersion, + paths []string, + shared []SharedSSTMeta, + cacheID uint64, + pending []base.DiskFileNum, + provider objstorage.Provider, +) (ingestLoadResult, error) { meta := make([]*fileMetadata, 0, len(paths)) newPaths := make([]string, 0, len(paths)) for i := range paths { m, err := ingestLoad1(opts, fmv, paths[i], cacheID, pending[i]) if err != nil { - return nil, nil, err + return ingestLoadResult{}, err } if m != nil { meta = append(meta, m) newPaths = append(newPaths, paths[i]) } } - return meta, newPaths, nil + if len(shared) == 0 { + return ingestLoadResult{localMeta: meta, localPaths: newPaths}, nil + } + sharedMeta := make([]*fileMetadata, 0, len(shared)) + levels := make([]uint8, 0, len(shared)) + var sharedObjs []objstorage.SharedObjectToAttach + for i := range shared { + backing, err := shared[i].Backing.Get() + if err != nil { + return ingestLoadResult{}, err + } + sharedObjs = append(sharedObjs, objstorage.SharedObjectToAttach{ + FileNum: pending[len(paths)+i], + FileType: fileTypeTable, + Backing: backing, + }) + } + sharedObjMetas, err := provider.AttachSharedObjects(sharedObjs) + if err != nil { + return ingestLoadResult{}, err + } + for i := range shared { + size, err := provider.Size(sharedObjMetas[i]) + if err != nil { + return ingestLoadResult{}, err + } + m, err := ingestLoad1Shared(opts, shared[i], pending[len(paths)+i], size) + if err != nil { + return ingestLoadResult{}, err + } + if m != nil { + sharedMeta = append(sharedMeta, m) + levels = append(levels, shared[i].Level) + } + } + result := ingestLoadResult{ + localMeta: meta, + sharedMeta: sharedMeta, + localPaths: newPaths, + sharedLevels: levels, + } + return result, nil } // Struct for sorting metadatas by smallest user keys, while ensuring the @@ -235,19 +344,27 @@ func (m metaAndPaths) Swap(i, j int) { m.paths[i], m.paths[j] = m.paths[j], m.paths[i] } -func ingestSortAndVerify(cmp Compare, meta []*fileMetadata, paths []string) error { - if len(meta) <= 1 { +func ingestSortAndVerify(cmp Compare, lr ingestLoadResult, exciseSpan KeyRange) error { + // Verify that all the shared files (i.e. files in sharedMeta) + // fit within the exciseSpan. + for i := range lr.sharedMeta { + f := lr.sharedMeta[i] + if !exciseSpan.Contains(cmp, f.Smallest.UserKey) || !exciseSpan.Contains(cmp, f.Largest.UserKey) { + return errors.New("pebble: shared file outside of excise span") + } + } + if len(lr.localMeta) <= 1 || len(lr.localPaths) <= 1 { return nil } sort.Sort(&metaAndPaths{ - meta: meta, - paths: paths, + meta: lr.localMeta, + paths: lr.localPaths, cmp: cmp, }) - for i := 1; i < len(meta); i++ { - if sstableKeyCompare(cmp, meta[i-1].Largest, meta[i].Smallest) >= 0 { + for i := 1; i < len(lr.localPaths); i++ { + if sstableKeyCompare(cmp, lr.localMeta[i-1].Largest, lr.localMeta[i].Smallest) >= 0 { return errors.New("pebble: external sstables have overlapping ranges") } } @@ -310,7 +427,8 @@ func ingestMemtableOverlaps(cmp Compare, mem flushable, meta []*fileMetadata) bo } for _, m := range meta { - if overlapWithIterator(iter, &rangeDelIter, rkeyIter, m, cmp) { + kr := internalKeyRange{smallest: m.Smallest, largest: m.Largest} + if overlapWithIterator(iter, &rangeDelIter, rkeyIter, kr, cmp) { closeIters() return true } @@ -362,21 +480,25 @@ func ingestUpdateSeqNum( return nil } +type internalKeyRange struct { + smallest, largest InternalKey +} + func overlapWithIterator( iter internalIterator, rangeDelIter *keyspan.FragmentIterator, rkeyIter keyspan.FragmentIterator, - meta *fileMetadata, + keyRange internalKeyRange, cmp Compare, ) bool { // Check overlap with point operations. // // When using levelIter, it seeks to the SST whose boundaries - // contain meta.Smallest.UserKey(S). + // contain keyRange.smallest.UserKey(S). // It then tries to find a point in that SST that is >= S. // If there's no such point it means the SST ends in a tombstone in which case // levelIter.SeekGE generates a boundary range del sentinel. - // The comparison of this boundary with meta.Largest(L) below + // The comparison of this boundary with keyRange.largest(L) below // is subtle but maintains correctness. // 1) boundary < L, // since boundary is also > S (initial seek), @@ -388,9 +510,9 @@ func overlapWithIterator( // means boundary < L and hence is similar to 1). // 4) boundary == L and L is sentinel, // we'll always overlap since for any values of i,j ranges [i, k) and [j, k) always overlap. - key, _ := iter.SeekGE(meta.Smallest.UserKey, base.SeekGEFlagsNone) + key, _ := iter.SeekGE(keyRange.smallest.UserKey, base.SeekGEFlagsNone) if key != nil { - c := sstableKeyCompare(cmp, *key, meta.Largest) + c := sstableKeyCompare(cmp, *key, keyRange.largest) if c <= 0 { return true } @@ -402,7 +524,7 @@ func overlapWithIterator( computeOverlapWithSpans := func(rIter keyspan.FragmentIterator) bool { // NB: The spans surfaced by the fragment iterator are non-overlapping. - span := rIter.SeekLT(meta.Smallest.UserKey) + span := rIter.SeekLT(keyRange.smallest.UserKey) if span == nil { span = rIter.Next() } @@ -411,13 +533,13 @@ func overlapWithIterator( continue } key := span.SmallestKey() - c := sstableKeyCompare(cmp, key, meta.Largest) + c := sstableKeyCompare(cmp, key, keyRange.largest) if c > 0 { // The start of the span is after the largest key in the // ingested table. return false } - if cmp(span.End, meta.Smallest.UserKey) > 0 { + if cmp(span.End, keyRange.smallest.UserKey) > 0 { // The end of the span is greater than the smallest in the // table. Note that the span end key is exclusive, thus ">0" // instead of ">=0". @@ -537,7 +659,11 @@ func ingestTargetLevel( v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), manifest.KeyTypeRange, ) - overlap := overlapWithIterator(iter, &rangeDelIter, &levelIter, meta, cmp) + kr := internalKeyRange{ + smallest: meta.Smallest, + largest: meta.Largest, + } + overlap := overlapWithIterator(iter, &rangeDelIter, &levelIter, kr, cmp) err := iter.Close() // Closes range del iter as well. err = firstError(err, levelIter.Close()) if err != nil { @@ -563,7 +689,11 @@ func ingestTargetLevel( v.Levels[level].Iter(), manifest.Level(level), manifest.KeyTypeRange, ) - overlap := overlapWithIterator(levelIter, &rangeDelIter, rkeyLevelIter, meta, cmp) + kr := internalKeyRange{ + smallest: meta.Smallest, + largest: meta.Largest, + } + overlap := overlapWithIterator(levelIter, &rangeDelIter, rkeyLevelIter, kr, cmp) err := levelIter.Close() // Closes range del iter as well. err = firstError(err, rkeyLevelIter.Close()) if err != nil { @@ -654,7 +784,7 @@ func (d *DB) Ingest(paths []string) error { if d.opts.ReadOnly { return ErrReadOnly } - _, err := d.ingest(paths, ingestTargetLevel) + _, err := d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}) return err } @@ -683,7 +813,27 @@ func (d *DB) IngestWithStats(paths []string) (IngestOperationStats, error) { if d.opts.ReadOnly { return IngestOperationStats{}, ErrReadOnly } - return d.ingest(paths, ingestTargetLevel) + return d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}) +} + +// IngestAndExcise does the same as IngestWithStats, and additionally accepts +// a list of shared files to ingest that can be read from a shared.Storage +// through a Provider. All the shared files must live within exciseSpan, and +// any existing keys in exciseSpan are deleted by turning existing sstables into +// virtual sstables (if not virtual already) and shrinking their spans to +// exclude exciseSpan. +// +// Panics if this DB instance was not instantiated with a shared.Storage. +func (d *DB) IngestAndExcise( + paths []string, shared []SharedSSTMeta, exciseSpan KeyRange, +) (IngestOperationStats, error) { + if err := d.closed.Load(); err != nil { + panic(err) + } + if d.opts.ReadOnly { + return IngestOperationStats{}, ErrReadOnly + } + return d.ingest(paths, ingestTargetLevel, shared, exciseSpan) } // Both DB.mu and commitPipeline.mu must be held while this is called. @@ -794,7 +944,10 @@ func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error } func (d *DB) ingest( - paths []string, targetLevelFunc ingestTargetLevelFunc, + paths []string, + targetLevelFunc ingestTargetLevelFunc, + shared []SharedSSTMeta, + exciseSpan KeyRange, ) (IngestOperationStats, error) { // Allocate file numbers for all of the files being ingested and mark them as // pending in order to prevent them from being deleted. Note that this causes @@ -802,27 +955,30 @@ func (d *DB) ingest( // ordering. The sorting of L0 tables by sequence number avoids relying on // that (busted) invariant. d.mu.Lock() - pendingOutputs := make([]base.DiskFileNum, len(paths)) + pendingOutputs := make([]base.DiskFileNum, len(paths)+len(shared)) for i := range paths { pendingOutputs[i] = d.mu.versions.getNextFileNum().DiskFileNum() } + for i := range shared { + pendingOutputs[len(paths)+i] = d.mu.versions.getNextFileNum().DiskFileNum() + } jobID := d.mu.nextJobID d.mu.nextJobID++ d.mu.Unlock() // Load the metadata for all of the files being ingested. This step detects // and elides empty sstables. - meta, paths, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, d.cacheID, pendingOutputs) + loadResult, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, shared, d.cacheID, pendingOutputs, d.ObjProvider()) if err != nil { return IngestOperationStats{}, err } - if len(meta) == 0 { + if len(loadResult.localMeta) == 0 && len(loadResult.sharedMeta) == 0 { // All of the sstables to be ingested were empty. Nothing to do. return IngestOperationStats{}, nil } // Verify the sstables do not overlap. - if err := ingestSortAndVerify(d.cmp, meta, paths); err != nil { + if err := ingestSortAndVerify(d.cmp, loadResult, exciseSpan); err != nil { return IngestOperationStats{}, err } @@ -831,7 +987,7 @@ func (d *DB) ingest( // (e.g. because the files reside on a different filesystem), ingestLink will // fall back to copying, and if that fails we undo our work and return an // error. - if err := ingestLink(jobID, d.opts, d.objProvider, paths, meta); err != nil { + if err := ingestLink(jobID, d.opts, d.objProvider, loadResult.localPaths, loadResult.localMeta); err != nil { return IngestOperationStats{}, err } // Make the new tables durable. We need to do this at some point before we @@ -844,11 +1000,12 @@ func (d *DB) ingest( // metaFlushableOverlaps is a slice parallel to meta indicating which of the // ingested sstables overlap some table in the flushable queue. It's used to // approximate ingest-into-L0 stats when using flushable ingests. - metaFlushableOverlaps := make([]bool, len(meta)) + metaFlushableOverlaps := make([]bool, len(loadResult.localMeta)+len(loadResult.sharedMeta)) var mem *flushableEntry var mut *memTable // asFlushable indicates whether the sstable was ingested as a flushable. var asFlushable bool + var overlapWithExciseSpan bool prepare := func(seqNum uint64) { // Note that d.commit.mu is held by commitPipeline when calling prepare. @@ -865,12 +1022,17 @@ func (d *DB) ingest( iter := m.newIter(nil) rangeDelIter := m.newRangeDelIter(nil) rkeyIter := m.newRangeKeyIter(nil) - for i := range meta { + + checkForOverlap := func(i int, meta *fileMetadata) { if metaFlushableOverlaps[i] { // This table already overlapped a more recent flushable. - continue + return } - if overlapWithIterator(iter, &rangeDelIter, rkeyIter, meta[i], d.cmp) { + kr := internalKeyRange{ + smallest: meta.Smallest, + largest: meta.Largest, + } + if overlapWithIterator(iter, &rangeDelIter, rkeyIter, kr, d.cmp) { // If this is the first table to overlap a flushable, save // the flushable. This ingest must be ingested or flushed // after it. @@ -880,6 +1042,24 @@ func (d *DB) ingest( metaFlushableOverlaps[i] = true } } + for i := range loadResult.localMeta { + checkForOverlap(i, loadResult.localMeta[i]) + } + for i := range loadResult.sharedMeta { + checkForOverlap(len(loadResult.localMeta)+i, loadResult.sharedMeta[i]) + } + if exciseSpan.Valid() { + kr := internalKeyRange{ + smallest: base.MakeInternalKey(exciseSpan.Start, InternalKeySeqNumMax, InternalKeyKindMax), + largest: base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, exciseSpan.End), + } + if overlapWithIterator(iter, &rangeDelIter, rkeyIter, kr, d.cmp) { + if mem == nil { + mem = m + } + overlapWithExciseSpan = true + } + } err := iter.Close() if rangeDelIter != nil { err = firstError(err, rangeDelIter.Close()) @@ -910,9 +1090,16 @@ func (d *DB) ingest( // The ingestion overlaps with some entry in the flushable queue. if d.mu.formatVers.vers < FormatFlushableIngest || d.opts.Experimental.DisableIngestAsFlushable() || + len(shared) > 0 || overlapWithExciseSpan || (len(d.mu.mem.queue) > d.opts.MemTableStopWritesThreshold-1) { // We're not able to ingest as a flushable, // so we must synchronously flush. + // + // TODO(bilal): Currently, if any of the files being ingested are shared or + // there's overlap between the memtable and an excise span, we cannot use + // flushable ingests and need to wait synchronously. Either remove this + // caveat by fleshing out flushable ingest logic to also account for these + // cases, or remove this TODO. if mem.flushable == d.mu.mem.mutable { err = d.makeRoomForWrite(nil) } @@ -932,7 +1119,7 @@ func (d *DB) ingest( // Since there aren't too many memtables already queued up, we can // slide the ingested sstables on top of the existing memtables. asFlushable = true - err = d.handleIngestAsFlushable(meta, seqNum) + err = d.handleIngestAsFlushable(loadResult.localMeta, seqNum) } var ve *versionEdit @@ -945,12 +1132,15 @@ func (d *DB) ingest( return } - // Update the sequence number for all of the sstables in the + // Update the sequence number for all local sstables in the // metadata. Writing the metadata to the manifest when the // version edit is applied is the mechanism that persists the // sequence number. The sstables themselves are left unmodified. + // + // For shared sstables, we do not need to update sequence numbers. These + // sequence numbers are already set in ingestLoad. if err = ingestUpdateSeqNum( - d.cmp, d.opts.Comparer.FormatKey, seqNum, meta, + d.cmp, d.opts.Comparer.FormatKey, seqNum, loadResult.localMeta, ); err != nil { if mut != nil { mut.writerUnref() @@ -966,28 +1156,34 @@ func (d *DB) ingest( // Assign the sstables to the correct level in the LSM and apply the // version edit. - ve, err = d.ingestApply(jobID, meta, targetLevelFunc, mut) + ve, err = d.ingestApply(jobID, loadResult, targetLevelFunc, mut, exciseSpan) } - d.commit.AllocateSeqNum(len(meta), prepare, apply) + d.commit.AllocateSeqNum(len(loadResult.localPaths), prepare, apply) if err != nil { - if err2 := ingestCleanup(d.objProvider, meta); err2 != nil { + if err2 := ingestCleanup(d.objProvider, loadResult.localMeta); err2 != nil { d.opts.Logger.Infof("ingest cleanup failed: %v", err2) } } else { // Since we either created a hard link to the ingesting files, or copied // them over, it is safe to remove the originals paths. - for _, path := range paths { + for _, path := range loadResult.localPaths { if err2 := d.opts.FS.Remove(path); err2 != nil { d.opts.Logger.Infof("ingest failed to remove original file: %s", err2) } } } + // NB: Shared-sstable-only ingestions do not assign a sequence number to + // any sstables. + globalSeqNum := uint64(0) + if len(loadResult.localMeta) > 0 { + globalSeqNum = loadResult.localMeta[0].SmallestSeqNum + } info := TableIngestInfo{ JobID: jobID, - GlobalSeqNum: meta[0].SmallestSeqNum, + GlobalSeqNum: globalSeqNum, Err: err, flushable: asFlushable, } @@ -1005,16 +1201,17 @@ func (d *DB) ingest( if e.Level == 0 { stats.ApproxIngestedIntoL0Bytes += e.Meta.Size } - if metaFlushableOverlaps[i] { + if i < len(metaFlushableOverlaps) && metaFlushableOverlaps[i] { stats.MemtableOverlappingFiles++ } } } else if asFlushable { + // NB: If asFlushable == true, there are no shared sstables. info.Tables = make([]struct { TableInfo Level int - }, len(meta)) - for i, f := range meta { + }, len(loadResult.localMeta)) + for i, f := range loadResult.localMeta { info.Tables[i].Level = -1 info.Tables[i].TableInfo = f.TableInfo() stats.Bytes += f.Size @@ -1037,6 +1234,245 @@ func (d *DB) ingest( return stats, err } +// excise updates ve to include a replacement of the file m with new virtual +// sstables that exclude exciseSpan, returning a slice of newly-created files if +// any. If the entirety of m is deleted by exciseSpan, no new sstables are added +// and m is deleted. Note that ve is updated in-place. +// +// The manifest lock must be held when calling this method. +func (d *DB) excise( + exciseSpan KeyRange, m *fileMetadata, ve *versionEdit, level int, +) ([]manifest.NewFileEntry, error) { + numCreatedFiles := 0 + // Check if there's actually an overlap between m and exciseSpan. + if d.cmp(exciseSpan.Start, m.Largest.UserKey) > 0 || d.cmp(exciseSpan.End, m.Smallest.UserKey) < 0 || + (m.Largest.IsExclusiveSentinel() && d.cmp(m.Largest.UserKey, exciseSpan.Start) == 0) { + return nil, nil + } + ve.DeletedFiles[deletedFileEntry{ + Level: level, + FileNum: m.FileNum, + }] = m + // Fast path: m sits entirely within the exciseSpan, so just delete it. + if exciseSpan.Contains(d.cmp, m.Smallest.UserKey) && (exciseSpan.Contains(d.cmp, m.Largest.UserKey) || + (m.Largest.IsExclusiveSentinel() && d.cmp(m.Largest.UserKey, exciseSpan.End) == 0)) { + return nil, nil + } + var iter internalIterator + var rangeDelIter keyspan.FragmentIterator + var rangeKeyIter keyspan.FragmentIterator + backingTableCreated := false + // Create a file to the left of the excise span, if necessary. + // The bounds of this file will be [m.Smallest, exciseSpan.Start]. + if d.cmp(m.Smallest.UserKey, exciseSpan.Start) < 0 { + leftFile := &fileMetadata{ + Virtual: true, + FileBacking: m.FileBacking, + FileNum: d.mu.versions.getNextFileNum(), + } + leftFile.Smallest = m.Smallest + leftFile.SmallestRangeKey = m.SmallestRangeKey + leftFile.SmallestPointKey = m.SmallestPointKey + leftFile.HasPointKeys = m.HasPointKeys + leftFile.HasRangeKeys = m.HasRangeKeys + if m.HasPointKeys && exciseSpan.Contains(d.cmp, m.SmallestPointKey.UserKey) { + // This file will not contain any point keys, but will contain range keys. + leftFile.HasPointKeys = false + leftFile.Smallest = m.SmallestRangeKey + } else if m.HasRangeKeys && exciseSpan.Contains(d.cmp, m.SmallestRangeKey.UserKey) { + leftFile.HasRangeKeys = false + leftFile.Smallest = m.SmallestPointKey + } + if leftFile.HasPointKeys { + var err error + iter, rangeDelIter, err = d.newIters(context.TODO(), m, &IterOptions{level: manifest.Level(level)}, internalIterOpts{}) + if err != nil { + return nil, err + } + var key *InternalKey + if iter != nil { + defer iter.Close() + key, _ = iter.SeekLT(exciseSpan.Start, base.SeekLTFlagsNone) + } else { + iter = emptyIter + } + // Store the min of (exciseSpan.Start, rdel.End) in lastRangeDel. This + // needs to be a copy if the key is owned by the range del iter. + var lastRangeDel []byte + if rangeDelIter != nil { + defer rangeDelIter.Close() + rdel := rangeDelIter.SeekLT(exciseSpan.Start) + if rdel != nil { + lastRangeDel = append(lastRangeDel[:0], rdel.End...) + if d.cmp(lastRangeDel, exciseSpan.Start) > 0 { + lastRangeDel = exciseSpan.Start + } + } + } else { + rangeDelIter = emptyKeyspanIter + } + leftFile.HasPointKeys = key != nil || lastRangeDel != nil + if key != nil && (lastRangeDel == nil || d.cmp(lastRangeDel, key.UserKey) <= 0) { + leftFile.LargestPointKey = key.Clone() + } else if lastRangeDel != nil { + // key == nil || lastRangeDel > key.UserKey. + leftFile.LargestPointKey = base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, lastRangeDel) + } + leftFile.Largest = leftFile.LargestPointKey + } + if leftFile.HasRangeKeys { + var err error + rangeKeyIter, err = d.tableNewRangeKeyIter(m, keyspan.SpanIterOptions{}) + if err != nil { + return nil, err + } + // Store the min of (exciseSpan.Start, rkey.End) in lastRangeKey. This + // needs to be a copy if the key is owned by the range key iter. + var lastRangeKey []byte + var lastRangeKeyKind InternalKeyKind + defer rangeKeyIter.Close() + rkey := rangeKeyIter.SeekLT(exciseSpan.Start) + if rkey != nil { + lastRangeKey = append(lastRangeKey[:0], rkey.End...) + if d.cmp(lastRangeKey, exciseSpan.Start) > 0 { + lastRangeKey = exciseSpan.Start + } + lastRangeKeyKind = rkey.Keys[0].Kind() + } + leftFile.HasRangeKeys = lastRangeKey != nil + if leftFile.HasRangeKeys { + leftFile.LargestRangeKey = base.MakeExclusiveSentinelKey(lastRangeKeyKind, lastRangeKey) + if !leftFile.HasPointKeys || base.InternalCompare(d.cmp, leftFile.LargestPointKey, leftFile.LargestRangeKey) < 0 { + leftFile.Largest = leftFile.LargestRangeKey + } + } + } + if leftFile.HasRangeKeys || leftFile.HasPointKeys { + var err error + leftFile.Size, err = d.tableCache.estimateSize(m, leftFile.Smallest.UserKey, leftFile.Largest.UserKey) + if err != nil { + return nil, err + } + if err := leftFile.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil { + return nil, err + } + ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: leftFile}) + ve.CreatedBackingTables = append(ve.CreatedBackingTables, leftFile.FileBacking) + backingTableCreated = true + numCreatedFiles++ + } + } + // Create a file to the right, if necessary. + if d.cmp(exciseSpan.End, m.Largest.UserKey) > 0 || (d.cmp(exciseSpan.End, m.Largest.UserKey) == 0 && m.Largest.IsExclusiveSentinel()) { + // No key exists to the right of the excise span in this file. + return ve.NewFiles[len(ve.NewFiles)-numCreatedFiles:], nil + } + // Create a new file, rightFile, between [exciseSpan.End, m.Largest]. + rightFile := &fileMetadata{ + Virtual: true, + FileBacking: m.FileBacking, + FileNum: d.mu.versions.getNextFileNum(), + } + rightFile.Largest = m.Largest + rightFile.LargestRangeKey = m.LargestRangeKey + rightFile.LargestPointKey = m.LargestPointKey + rightFile.HasPointKeys = m.HasPointKeys + rightFile.HasRangeKeys = m.HasRangeKeys + if m.HasPointKeys && exciseSpan.Contains(d.cmp, m.LargestPointKey.UserKey) { + // This file will not contain any point keys, but will contain range keys. + rightFile.HasPointKeys = false + rightFile.Largest = m.LargestRangeKey + } else if m.HasRangeKeys && exciseSpan.Contains(d.cmp, m.LargestRangeKey.UserKey) { + rightFile.HasRangeKeys = false + rightFile.Largest = m.LargestPointKey + } + if rightFile.HasPointKeys { + var err error + if iter == nil && rangeDelIter == nil { + iter, rangeDelIter, err = d.newIters(context.TODO(), m, &IterOptions{level: manifest.Level(level)}, internalIterOpts{}) + if err != nil { + return nil, err + } + if iter != nil { + defer iter.Close() + } else { + iter = emptyIter + } + if rangeDelIter != nil { + defer rangeDelIter.Close() + } else { + rangeDelIter = emptyKeyspanIter + } + } + // Store the max of (exciseSpan.End, rdel.Start) in firstRangeDel. This + // needs to be a copy if the key is owned by the range del iter. + key, _ := iter.SeekGE(exciseSpan.End, base.SeekGEFlagsNone) + var firstRangeDel []byte + rdel := rangeDelIter.SeekGE(exciseSpan.End) + if rdel != nil { + firstRangeDel = append(firstRangeDel[:0], rdel.Start...) + if d.cmp(firstRangeDel, exciseSpan.End) < 0 { + firstRangeDel = exciseSpan.End + } + } + rightFile.HasPointKeys = key != nil || firstRangeDel != nil + if key != nil && (firstRangeDel == nil || base.InternalCompare(d.cmp, *key, rdel.SmallestKey()) < 0) { + rightFile.SmallestPointKey = key.Clone() + } else if firstRangeDel != nil { + // key == nil || firstRangeDel <= key.UserKey. + rightFile.SmallestPointKey = rdel.SmallestKey() + rightFile.SmallestPointKey.UserKey = firstRangeDel + } + rightFile.Smallest = rightFile.SmallestPointKey + } + if rightFile.HasRangeKeys { + if rangeKeyIter == nil { + var err error + rangeKeyIter, err = d.tableNewRangeKeyIter(m, keyspan.SpanIterOptions{}) + if err != nil { + return nil, err + } + defer rangeKeyIter.Close() + } + // Store the max of (exciseSpan.End, rkey.Start) in firstRangeKey. This + // needs to be a copy if the key is owned by the range key iter. + var firstRangeKey []byte + rkey := rangeKeyIter.SeekGE(exciseSpan.End) + if rkey != nil { + firstRangeKey = append(firstRangeKey[:0], rkey.Start...) + if d.cmp(firstRangeKey, exciseSpan.End) < 0 { + firstRangeKey = exciseSpan.End + } + } + rightFile.HasRangeKeys = firstRangeKey != nil + if rightFile.HasRangeKeys { + rightFile.SmallestRangeKey = rkey.SmallestKey() + rightFile.SmallestRangeKey.UserKey = firstRangeKey + if !rightFile.HasPointKeys || base.InternalCompare(d.cmp, rightFile.SmallestPointKey, rightFile.SmallestRangeKey) > 0 { + rightFile.Smallest = rightFile.SmallestRangeKey + } + } + } + if rightFile.HasRangeKeys || rightFile.HasPointKeys { + var err error + rightFile.Size, err = d.tableCache.estimateSize(m, rightFile.Smallest.UserKey, rightFile.Largest.UserKey) + if err != nil { + return nil, err + } + ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: rightFile}) + if !backingTableCreated { + ve.CreatedBackingTables = append(ve.CreatedBackingTables, rightFile.FileBacking) + backingTableCreated = true + } + numCreatedFiles++ + } + + if err := rightFile.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil { + return nil, err + } + return ve.NewFiles[len(ve.NewFiles)-numCreatedFiles:], nil +} + type ingestTargetLevelFunc func( newIters tableNewIters, newRangeKeyIter keyspan.TableNewSpanIter, @@ -1049,13 +1485,20 @@ type ingestTargetLevelFunc func( ) (int, error) func (d *DB) ingestApply( - jobID int, meta []*fileMetadata, findTargetLevel ingestTargetLevelFunc, mut *memTable, + jobID int, + lr ingestLoadResult, + findTargetLevel ingestTargetLevelFunc, + mut *memTable, + exciseSpan KeyRange, ) (*versionEdit, error) { d.mu.Lock() defer d.mu.Unlock() ve := &versionEdit{ - NewFiles: make([]newFileEntry, len(meta)), + NewFiles: make([]newFileEntry, len(lr.localMeta)+len(lr.sharedMeta)), + } + if exciseSpan.Valid() { + ve.DeletedFiles = map[manifest.DeletedFileEntry]*manifest.FileMetadata{} } metrics := make(map[int]*LevelMetrics) @@ -1079,13 +1522,27 @@ func (d *DB) ingestApply( current := d.mu.versions.currentVersion() baseLevel := d.mu.versions.picker.getBaseLevel() iterOps := IterOptions{logger: d.opts.Logger} - for i := range meta { + for i := 0; i < len(lr.localMeta)+len(lr.sharedMeta); i++ { // Determine the lowest level in the LSM for which the sstable doesn't // overlap any existing files in the level. - m := meta[i] + var m *fileMetadata + sharedIdx := -1 + if i < len(lr.localMeta) { + m = lr.localMeta[i] + } else { + sharedIdx = i - len(lr.localMeta) + m = lr.sharedMeta[sharedIdx] + } f := &ve.NewFiles[i] var err error - f.Level, err = findTargetLevel(d.newIters, d.tableNewRangeKeyIter, iterOps, d.cmp, current, baseLevel, d.mu.compact.inProgress, m) + if sharedIdx >= 0 { + f.Level = int(lr.sharedLevels[sharedIdx]) + if f.Level < sharedLevelsStart { + panic("cannot slot a shared file higher than the highest shared level") + } + } else { + f.Level, err = findTargetLevel(d.newIters, d.tableNewRangeKeyIter, iterOps, d.cmp, current, baseLevel, d.mu.compact.inProgress, m) + } if err != nil { d.mu.versions.logUnlock() return nil, err @@ -1101,6 +1558,47 @@ func (d *DB) ingestApply( levelMetrics.BytesIngested += m.Size levelMetrics.TablesIngested++ } + if exciseSpan.Valid() { + // Release the db mutex. We don't need to hold it while we do the excise; + // we just need to prevent any new versions from being installed between + // now and the logAndApply() below. We achieve this by holding onto the + // manifest lock. And since d.excise does IO, it's preferable to release + // d.mu at this time. + // + // Note that since there's a `defer d.mu.Unlock()` above, we need to relock + // the db mutex before we return an error. + d.mu.Unlock() + for level := range current.Levels { + iter := current.Levels[level].Iter() + for m := iter.SeekGE(d.cmp, exciseSpan.Start); m != nil && d.cmp(m.Smallest.UserKey, exciseSpan.End) < 0; m = iter.Next() { + excised, err := d.excise(exciseSpan, m, ve, level) + if err != nil { + d.mu.Lock() + return nil, err + } + + if _, ok := ve.DeletedFiles[deletedFileEntry{ + Level: level, + FileNum: m.FileNum, + }]; !ok { + // We did not excise this file. + continue + } + levelMetrics := metrics[level] + if levelMetrics == nil { + levelMetrics = &LevelMetrics{} + metrics[level] = levelMetrics + } + levelMetrics.NumFiles-- + levelMetrics.Size -= int64(m.Size) + for i := range excised { + levelMetrics.NumFiles++ + levelMetrics.Size += int64(excised[i].Meta.Size) + } + } + } + d.mu.Lock() + } if err := d.mu.versions.logAndApply(jobID, ve, metrics, false /* forceRotation */, func() []compactionInfo { return d.getInProgressCompactionInfoLocked(nil) }); err != nil { diff --git a/ingest_test.go b/ingest_test.go index 22a8ab9b42..b591b34cba 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/pebble/internal/testkeys" "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/objstorage/objstorageprovider" + "github.com/cockroachdb/pebble/objstorage/shared" "github.com/cockroachdb/pebble/record" "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" @@ -124,12 +125,12 @@ func TestIngestLoad(t *testing.T) { Comparer: DefaultComparer, FS: mem, }).WithFSDefaults() - meta, _, err := ingestLoad(opts, dbVersion, []string{"ext"}, 0, []base.DiskFileNum{base.FileNum(1).DiskFileNum()}) + lr, err := ingestLoad(opts, dbVersion, []string{"ext"}, nil /* shared */, 0, []base.DiskFileNum{base.FileNum(1).DiskFileNum()}, nil /* provider */) if err != nil { return err.Error() } var buf bytes.Buffer - for _, m := range meta { + for _, m := range lr.localMeta { fmt.Fprintf(&buf, "%d: %s-%s\n", m.FileNum, m.Smallest, m.Largest) fmt.Fprintf(&buf, " points: %s-%s\n", m.SmallestPointKey, m.LargestPointKey) fmt.Fprintf(&buf, " ranges: %s-%s\n", m.SmallestRangeKey, m.LargestRangeKey) @@ -211,13 +212,13 @@ func TestIngestLoadRand(t *testing.T) { Comparer: DefaultComparer, FS: mem, }).WithFSDefaults() - meta, _, err := ingestLoad(opts, version, paths, 0, pending) + lr, err := ingestLoad(opts, version, paths, nil /* shared */, 0, pending, nil /* provider */) require.NoError(t, err) - for _, m := range meta { + for _, m := range lr.localMeta { m.CreationTime = 0 } - if diff := pretty.Diff(expected, meta); diff != nil { + if diff := pretty.Diff(expected, lr.localMeta); diff != nil { t.Fatalf("%s", strings.Join(diff, "\n")) } } @@ -232,7 +233,7 @@ func TestIngestLoadInvalid(t *testing.T) { Comparer: DefaultComparer, FS: mem, }).WithFSDefaults() - if _, _, err := ingestLoad(opts, internalFormatNewest, []string{"invalid"}, 0, []base.DiskFileNum{base.FileNum(1).DiskFileNum()}); err == nil { + if _, err := ingestLoad(opts, internalFormatNewest, []string{"invalid"}, nil /* shared */, 0, []base.DiskFileNum{base.FileNum(1).DiskFileNum()}, nil /* provider */); err == nil { t.Fatalf("expected error, but found success") } } @@ -273,7 +274,8 @@ func TestIngestSortAndVerify(t *testing.T) { meta = append(meta, m) paths = append(paths, strconv.Itoa(i)) } - err := ingestSortAndVerify(cmp, meta, paths) + lr := ingestLoadResult{localPaths: paths, localMeta: meta} + err := ingestSortAndVerify(cmp, lr, KeyRange{}) if err != nil { return fmt.Sprintf("%v\n", err) } @@ -570,6 +572,267 @@ func TestOverlappingIngestedSSTs(t *testing.T) { }) } +func TestExcise(t *testing.T) { + var mem vfs.FS + var d *DB + var flushed bool + defer func() { + require.NoError(t, d.Close()) + }() + + reset := func() { + if d != nil { + require.NoError(t, d.Close()) + } + + mem = vfs.NewMem() + require.NoError(t, mem.MkdirAll("ext", 0755)) + opts := &Options{ + FS: mem, + L0CompactionThreshold: 100, + L0StopWritesThreshold: 100, + DebugCheck: DebugCheckLevels, + EventListener: &EventListener{FlushEnd: func(info FlushInfo) { + flushed = true + }}, + FormatMajorVersion: FormatNewest, + } + // Disable automatic compactions because otherwise we'll race with + // delete-only compactions triggered by ingesting range tombstones. + opts.DisableAutomaticCompactions = true + + var err error + d, err = Open("", opts) + require.NoError(t, err) + } + reset() + + datadriven.RunTest(t, "testdata/excise", func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "reset": + reset() + return "" + case "batch": + b := d.NewIndexedBatch() + if err := runBatchDefineCmd(td, b); err != nil { + return err.Error() + } + if err := b.Commit(nil); err != nil { + return err.Error() + } + return "" + case "build": + if err := runBuildCmd(td, d, mem); err != nil { + return err.Error() + } + return "" + + case "flush": + if err := d.Flush(); err != nil { + return err.Error() + } + return "" + + case "ingest": + flushed = false + if err := runIngestCmd(td, d, mem); err != nil { + return err.Error() + } + // Wait for a possible flush. + d.mu.Lock() + for d.mu.compact.flushing { + d.mu.compact.cond.Wait() + } + d.mu.Unlock() + if flushed { + return "memtable flushed" + } + return "" + + case "ingest-and-excise": + flushed = false + if err := runIngestAndExciseCmd(td, d, mem); err != nil { + return err.Error() + } + // Wait for a possible flush. + d.mu.Lock() + for d.mu.compact.flushing { + d.mu.compact.cond.Wait() + } + d.mu.Unlock() + if flushed { + return "memtable flushed" + } + return "" + + case "get": + return runGetCmd(t, td, d) + + case "iter": + iter := d.NewIter(&IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + }) + return runIterCmd(td, iter, true) + + case "lsm": + return runLSMCmd(td, d) + + case "metrics": + // The asynchronous loading of table stats can change metrics, so + // wait for all the tables' stats to be loaded. + d.mu.Lock() + d.waitTableStats() + d.mu.Unlock() + + return d.Metrics().String() + + case "wait-pending-table-stats": + return runTableStatsCmd(td, d) + + case "excise": + ve := &versionEdit{ + DeletedFiles: map[deletedFileEntry]*fileMetadata{}, + } + var exciseSpan KeyRange + if len(td.CmdArgs) != 2 { + panic("insufficient args for compact command") + } + exciseSpan.Start = []byte(td.CmdArgs[0].Key) + exciseSpan.End = []byte(td.CmdArgs[1].Key) + + d.mu.Lock() + d.mu.versions.logLock() + d.mu.Unlock() + current := d.mu.versions.currentVersion() + for level := range current.Levels { + iter := current.Levels[level].Iter() + for m := iter.SeekGE(d.cmp, exciseSpan.Start); m != nil && d.cmp(m.Smallest.UserKey, exciseSpan.End) < 0; m = iter.Next() { + _, err := d.excise(exciseSpan, m, ve, level) + if err != nil { + d.mu.Lock() + d.mu.versions.logUnlock() + d.mu.Unlock() + return fmt.Sprintf("error when excising %s: %s", m.FileNum, err.Error()) + } + } + } + d.mu.Lock() + d.mu.versions.logUnlock() + d.mu.Unlock() + return fmt.Sprintf("would excise %d files, use ingest-and-excise to excise.\n%s", len(ve.DeletedFiles), ve.String()) + + case "compact": + if len(td.CmdArgs) != 2 { + panic("insufficient args for compact command") + } + l := td.CmdArgs[0].Key + r := td.CmdArgs[1].Key + err := d.Compact([]byte(l), []byte(r), false) + if err != nil { + return err.Error() + } + return "" + default: + return fmt.Sprintf("unknown command: %s", td.Cmd) + } + }) +} + +func TestIngestShared(t *testing.T) { + mem := vfs.NewMem() + var d *DB + var provider2 objstorage.Provider + opts2 := Options{FS: vfs.NewMem()} + opts2.EnsureDefaults() + + // Create an objProvider where we will fake-create some sstables that can + // then be shared back to the db instance. + providerSettings := objstorageprovider.Settings{ + Logger: opts2.Logger, + FS: opts2.FS, + FSDirName: "", + FSDirInitialListing: nil, + FSCleaner: opts2.Cleaner, + NoSyncOnClose: opts2.NoSyncOnClose, + BytesPerSync: opts2.BytesPerSync, + } + providerSettings.Shared.Storage = shared.NewInMem() + + provider2, err := objstorageprovider.Open(providerSettings) + require.NoError(t, err) + creatorIDCounter := uint64(1) + provider2.SetCreatorID(objstorage.CreatorID(creatorIDCounter)) + creatorIDCounter++ + + defer func() { + require.NoError(t, d.Close()) + }() + + reset := func() { + if d != nil { + require.NoError(t, d.Close()) + } + + mem = vfs.NewMem() + require.NoError(t, mem.MkdirAll("ext", 0755)) + opts := &Options{ + FS: mem, + L0CompactionThreshold: 100, + L0StopWritesThreshold: 100, + FormatMajorVersion: FormatNewest, + } + opts.Experimental.SharedStorage = providerSettings.Shared.Storage + + var err error + d, err = Open("", opts) + require.NoError(t, err) + require.NoError(t, d.SetCreatorID(creatorIDCounter)) + creatorIDCounter++ + } + reset() + + metaMap := map[base.DiskFileNum]objstorage.ObjectMetadata{} + + require.NoError(t, d.Set([]byte("d"), []byte("unexpected"), nil)) + require.NoError(t, d.Set([]byte("e"), []byte("unexpected"), nil)) + require.NoError(t, d.Set([]byte("a"), []byte("unexpected"), nil)) + require.NoError(t, d.Set([]byte("f"), []byte("unexpected"), nil)) + d.Flush() + + { + // Create a shared file. + fn := base.FileNum(2) + f, meta, err := provider2.Create(context.TODO(), fileTypeTable, fn.DiskFileNum(), objstorage.CreateOptions{PreferSharedStorage: true}) + require.NoError(t, err) + w := sstable.NewWriter(f, d.opts.MakeWriterOptions(0, d.opts.FormatMajorVersion.MaxTableFormat())) + w.Set([]byte("d"), []byte("shared")) + w.Set([]byte("e"), []byte("shared")) + w.Close() + metaMap[fn.DiskFileNum()] = meta + } + + m := metaMap[base.FileNum(2).DiskFileNum()] + handle, err := provider2.SharedObjectBacking(&m) + require.NoError(t, err) + size, err := provider2.Size(m) + require.NoError(t, err) + + sharedSSTMeta := SharedSSTMeta{ + Backing: handle, + Smallest: base.MakeInternalKey([]byte("d"), 0, InternalKeyKindSet), + Largest: base.MakeInternalKey([]byte("e"), 0, InternalKeyKindSet), + SmallestPointKey: base.MakeInternalKey([]byte("d"), 0, InternalKeyKindSet), + LargestPointKey: base.MakeInternalKey([]byte("e"), 0, InternalKeyKindSet), + Level: 6, + Size: uint64(size + 5), + } + _, err = d.IngestAndExcise([]string{}, []SharedSSTMeta{sharedSSTMeta}, KeyRange{Start: []byte("d"), End: []byte("ee")}) + require.NoError(t, err) + + // TODO(bilal): Once reading of shared sstables is in, verify that the values + // of d and e have been updated. +} + func TestIngestMemtableOverlaps(t *testing.T) { comparers := []Comparer{ {Name: "default", Compare: DefaultComparer.Compare, FormatKey: DefaultComparer.FormatKey}, diff --git a/internal/manifest/l0_sublevels.go b/internal/manifest/l0_sublevels.go index 3646a034e8..0b81d8abad 100644 --- a/internal/manifest/l0_sublevels.go +++ b/internal/manifest/l0_sublevels.go @@ -894,6 +894,16 @@ type UserKeyRange struct { Start, End []byte } +// Valid returns true if the UserKeyRange is defined. +func (k *UserKeyRange) Valid() bool { + return k.Start != nil && k.End != nil +} + +// Contains returns whether the specified key exists in the UserKeyRange. +func (k *UserKeyRange) Contains(cmp base.Compare, key []byte) bool { + return cmp(k.Start, key) <= 0 && cmp(key, k.End) < 0 +} + // InUseKeyRanges returns the merged table bounds of L0 files overlapping the // provided user key range. The returned key ranges are sorted and // nonoverlapping. diff --git a/internal/manifest/version.go b/internal/manifest/version.go index ca16ba1369..9295571bba 100644 --- a/internal/manifest/version.go +++ b/internal/manifest/version.go @@ -227,6 +227,12 @@ type FileMetadata struct { // we'd have to write virtual sstable stats to the version edit. Stats TableStats + // Deleted is set to true if a VersionEdit gets installed that has deleted + // this file. Protected by the manifest lock (see versionSet.logLock()). + Deleted bool + + // For L0 files only. Protected by DB.mu. Used to generate L0 sublevels and + // pick L0 compactions. Only accurate for the most recent Version. SubLevel int L0Index int minIntervalIndex int @@ -235,9 +241,6 @@ type FileMetadata struct { // NB: the alignment of this struct is 8 bytes. We pack all the bools to // ensure an optimal packing. - // For L0 files only. Protected by DB.mu. Used to generate L0 sublevels and - // pick L0 compactions. Only accurate for the most recent Version. - // // IsIntraL0Compacting is set to True if this file is part of an intra-L0 // compaction. When it's true, IsCompacting must also return true. If // Compacting is true and IsIntraL0Compacting is false for an L0 file, the @@ -370,6 +373,17 @@ func (m *FileMetadata) InitPhysicalBacking() { } } +// InitProviderBacking creates a new FileBacking for a file backed by +// an objstorage.Provider. +func (m *FileMetadata) InitProviderBacking(fileNum base.DiskFileNum, size uint64) { + if !m.Virtual { + panic("pebble: provider-backed sstables must be virtual") + } + if m.FileBacking == nil { + m.FileBacking = &FileBacking{Size: m.Size, DiskFileNum: m.FileNum.DiskFileNum()} + } +} + // ValidateVirtual should be called once the FileMetadata for a virtual sstable // is created to verify that the fields of the virtual sstable are sound. func (m *FileMetadata) ValidateVirtual(createdFrom *FileMetadata) { diff --git a/open.go b/open.go index 1396e0f3cd..85fcc4b3fe 100644 --- a/open.go +++ b/open.go @@ -804,13 +804,14 @@ func (d *DB) replayWAL( paths[i] = base.MakeFilepath(d.opts.FS, d.dirname, fileTypeTable, n) } - var meta []*manifest.FileMetadata - meta, _, err = ingestLoad( - d.opts, d.mu.formatVers.vers, paths, d.cacheID, fileNums, + var lr ingestLoadResult + lr, err = ingestLoad( + d.opts, d.mu.formatVers.vers, paths, nil /* shared */, d.cacheID, fileNums, d.objProvider, ) if err != nil { return nil, 0, err } + meta := lr.localMeta if uint32(len(meta)) != b.Count() { panic("pebble: couldn't load all files in WAL entry.") diff --git a/scan_internal.go b/scan_internal.go index beee42004f..c2a2f3cf16 100644 --- a/scan_internal.go +++ b/scan_internal.go @@ -736,10 +736,6 @@ func (d *DB) truncateSharedFile( // We will need to truncate file bounds in at least one direction. Open all // relevant iterators. - // - // TODO(bilal): Once virtual sstables go in, verify that the constraining of - // bounds to virtual sstable bounds happens below this method, so we aren't - // unintentionally exposing keys we shouldn't be exposing. iter, rangeDelIter, err := d.newIters(ctx, file, &IterOptions{ LowerBound: lower, UpperBound: upper, @@ -843,6 +839,10 @@ func (d *DB) truncateSharedFile( if len(sst.Smallest.UserKey) == 0 { return nil, true, nil } + sst.Size, err = d.tableCache.estimateSize(file, sst.Smallest.UserKey, sst.Largest.UserKey) + if err != nil { + return nil, false, err + } return sst, false, nil } diff --git a/table_cache.go b/table_cache.go index 186b928d9b..454e211d70 100644 --- a/table_cache.go +++ b/table_cache.go @@ -170,6 +170,32 @@ func (c *tableCacheContainer) metrics() (CacheMetrics, FilterMetrics) { return m, f } +func (c *tableCacheContainer) estimateSize( + meta *fileMetadata, lower, upper []byte, +) (size uint64, err error) { + if meta.Virtual { + err = c.withVirtualReader( + meta.VirtualMeta(), + func(r sstable.VirtualReader) (err error) { + size, err = r.EstimateDiskUsage(lower, upper) + return err + }, + ) + } else { + err = c.withReader( + meta.PhysicalMeta(), + func(r *sstable.Reader) (err error) { + size, err = r.EstimateDiskUsage(lower, upper) + return err + }, + ) + } + if err != nil { + return 0, err + } + return size, nil +} + func (c *tableCacheContainer) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error { s := c.tableCache.getShard(meta.FileBacking.DiskFileNum) v := s.findNode(meta.FileMetadata, &c.dbOpts) diff --git a/table_stats.go b/table_stats.go index ef6f0c74da..b3c4c8b3ac 100644 --- a/table_stats.go +++ b/table_stats.go @@ -186,6 +186,11 @@ func (d *DB) loadNewFileStats( continue } + if nf.Meta.Virtual { + // cannot load virtual table stats + continue + } + stats, newHints, err := d.loadTableStats( rs.current, nf.Level, nf.Meta.PhysicalMeta(), diff --git a/testdata/excise b/testdata/excise new file mode 100644 index 0000000000..4acf0b107f --- /dev/null +++ b/testdata/excise @@ -0,0 +1,165 @@ + +build ext0 format=pebblev2 +set a 1 +set l 2 +---- + +ingest ext0 +---- + +lsm +---- +6: + 000004:[a#10,SET-l#10,SET] + + +batch +set d foo +set f bar +---- + +flush +---- + +lsm +---- +0.0: + 000006:[d#11,SET-f#12,SET] +6: + 000004:[a#10,SET-l#10,SET] + +excise c k +---- +would excise 2 files, use ingest-and-excise to excise. + deleted: L0 000006 + deleted: L6 000004 + added: L6 000007:[a#10,1-a#10,1] + added: L6 000008:[l#10,1-l#10,1] + + +excise a e +---- +would excise 2 files, use ingest-and-excise to excise. + deleted: L0 000006 + deleted: L6 000004 + added: L0 000009:[f#12,1-f#12,1] + added: L6 000010:[l#10,1-l#10,1] + +excise e z +---- +would excise 2 files, use ingest-and-excise to excise. + deleted: L0 000006 + deleted: L6 000004 + added: L0 000011:[d#11,1-d#11,1] + added: L6 000012:[a#10,1-a#10,1] + +excise f l +---- +would excise 2 files, use ingest-and-excise to excise. + deleted: L0 000006 + deleted: L6 000004 + added: L0 000013:[d#11,1-d#11,1] + added: L6 000014:[a#10,1-a#10,1] + added: L6 000015:[l#10,1-l#10,1] + +excise f ll +---- +would excise 2 files, use ingest-and-excise to excise. + deleted: L0 000006 + deleted: L6 000004 + added: L0 000016:[d#11,1-d#11,1] + added: L6 000017:[a#10,1-a#10,1] + +excise p q +---- +would excise 0 files, use ingest-and-excise to excise. + +lsm +---- +0.0: + 000006:[d#11,SET-f#12,SET] +6: + 000004:[a#10,SET-l#10,SET] + +build ext1 format=pebblev2 +set d foo3 +set e bar2 +---- + +ingest-and-excise ext1 excise="c-k" +---- + +lsm +---- +0.0: + 000018:[d#13,SET-e#13,SET] +6: + 000019:[l#10,SET-l#10,SET] + +iter +first +next +next +next +---- +d: (foo3, .) +e: (bar2, .) +l: (2, .) +. + +# More complex cases, with the truncation of file bounds happening at rangedel +# and rangekey bounds. + +reset +---- + +build ext3 format=pebblev2 +range-key-set c f @4 foobar +---- + +ingest ext3 +---- + +build ext4 format=pebblev2 +set b bar +del-range g i +---- + +ingest ext4 +---- + +lsm +---- +0.0: + 000005:[b#11,SET-i#inf,RANGEDEL] +6: + 000004:[c#10,RANGEKEYSET-f#inf,RANGEKEYSET] + +excise f g +---- +would excise 1 files, use ingest-and-excise to excise. + deleted: L0 000005 + added: L0 000006:[b#11,1-b#11,1] + added: L0 000007:[g#11,15-i#72057594037927935,15] + +excise b c +---- +would excise 1 files, use ingest-and-excise to excise. + deleted: L0 000005 + added: L0 000008:[g#11,15-i#72057594037927935,15] + +excise i j +---- +would excise 0 files, use ingest-and-excise to excise. + +# Excise mid range key. This will not happen in practice, but excise() +# supports it. + +excise c d +---- +would excise 2 files, use ingest-and-excise to excise. + deleted: L0 000005 + deleted: L6 000004 + added: L0 000009:[b#11,1-b#11,1] + added: L0 000010:[g#11,15-i#72057594037927935,15] + added: L6 000011:[d#10,21-f#72057594037927935,21] diff --git a/version_set.go b/version_set.go index 41246abc1d..7005683f61 100644 --- a/version_set.go +++ b/version_set.go @@ -569,6 +569,14 @@ func (vs *versionSet) logAndApply( for fileNum, size := range zombies { vs.zombieTables[fileNum] = size } + // Update the Deleted bools. We can't use the zombieTables struct for this + // as it works on FileBackings, not FileMetadatas. + for _, f := range ve.DeletedFiles { + f.Deleted = true + } + for i := range ve.NewFiles { + ve.NewFiles[i].Meta.Deleted = false + } // Install the new version. vs.append(newVersion)