diff --git a/compaction.go b/compaction.go index 0389f514b8..f26413992c 100644 --- a/compaction.go +++ b/compaction.go @@ -2518,14 +2518,23 @@ func (d *DB) compactAndWrite( MaxGrandparentOverlapBytes: c.maxOverlapBytes, TargetOutputFileSize: c.maxOutputFileSize, } + considerCreateShared := remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level) + if considerCreateShared { + runnerCfg.ConsiderCreateShared = true + runnerCfg.SharedLowerUserKeyPrefix = d.opts.Experimental.SharedLowerUserKeyPrefix + } runner := compact.NewRunner(runnerCfg, iter) - for runner.MoreDataToWrite() { + for { + moreData, keyShouldBeWrittenToShared := runner.MoreDataToWrite() + if !moreData { + break + } if c.cancel.Load() { return runner.Finish().WithError(ErrCancelledCompaction) } // Create a new table. writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat) - objMeta, tw, cpuWorkHandle, err := d.newCompactionOutput(jobID, c, writerOpts) + objMeta, tw, cpuWorkHandle, err := d.newCompactionOutput(jobID, c, writerOpts, keyShouldBeWrittenToShared) if err != nil { return runner.Finish().WithError(err) } @@ -2652,7 +2661,7 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error // newCompactionOutput creates an object for a new table produced by a // compaction or flush. func (d *DB) newCompactionOutput( - jobID JobID, c *compaction, writerOpts sstable.WriterOptions, + jobID JobID, c *compaction, writerOpts sstable.WriterOptions, preferSharedStorage bool, ) (objstorage.ObjectMetadata, *sstable.Writer, CPUWorkHandle, error) { d.mu.Lock() diskFileNum := d.mu.versions.getNextDiskFileNum() @@ -2689,7 +2698,7 @@ func (d *DB) newCompactionOutput( // Prefer shared storage if present. createOpts := objstorage.CreateOptions{ - PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level), + PreferSharedStorage: preferSharedStorage, WriteCategory: writeCategory, } writable, objMeta, err := d.objProvider.Create(ctx, fileTypeTable, diskFileNum, createOpts) @@ -2721,6 +2730,9 @@ func (d *DB) newCompactionOutput( d.opts.Experimental.MaxWriterConcurrency > 0 && (cpuWorkHandle.Permitted() || d.opts.Experimental.ForceWriterParallelism) + if d.opts.Experimental.WriteSharedWithStrictObsolete && objMeta.IsShared() { + writerOpts.IsStrictObsolete = true + } tw := sstable.NewWriter(writable, writerOpts, cacheOpts) return objMeta, tw, cpuWorkHandle, nil } diff --git a/db.go b/db.go index 34c4e44319..a08850ec21 100644 --- a/db.go +++ b/db.go @@ -1250,6 +1250,17 @@ func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator { // creator ID was set (as creator IDs are necessary to enable shared storage) // resulting in some lower level SSTs being on non-shared storage. Skip-shared // iteration is invalid in those cases. +// +// The above error handling implies that ScanInternal with a non-nil +// VisitSharedFile can only be called without error if both the following +// conditions are true: +// +// - All files in the LSM conform to remote.CreateOnSharedLower strategy (they +// can of course conform to the stronger remote.CreateOnSharedAll). +// +// - If Options.Experimental.SharedLowerUserKeyPrefix is non-nil, the lower +// parameter must have a prefix that is greater than or equal to this lower +// bound. func (d *DB) ScanInternal( ctx context.Context, categoryAndQoS sstable.CategoryAndQoS, diff --git a/internal/compact/run.go b/internal/compact/run.go index f7591e9dad..7e2cb53886 100644 --- a/internal/compact/run.go +++ b/internal/compact/run.go @@ -79,6 +79,14 @@ type RunnerConfig struct { // during compaction. In practice, the sizes can vary between 50%-200% of this // value. TargetOutputFileSize uint64 + + // ConsiderCreateShared is true if this compaction can write shared files if + // SharedLowerUserKeyPrefix allows. + ConsiderCreateShared bool + + // Set equal to Options.Experimental.SharedLowerUserKeyPrefix, when + // ConsiderCreateShared is true, else nil. + SharedLowerUserKeyPrefix []byte } // Runner is a helper for running the "data" part of a compaction (where we use @@ -97,6 +105,11 @@ type Runner struct { cfg RunnerConfig iter *Iter + split base.Split + // At most one state transition from false => true, when the next key should + // be written to shared storage. + keyShouldBeWrittenToShared bool + tables []OutputTable // Stores any error encountered. err error @@ -113,20 +126,67 @@ type Runner struct { // NewRunner creates a new Runner. func NewRunner(cfg RunnerConfig, iter *Iter) *Runner { r := &Runner{ - cmp: iter.cmp, - cfg: cfg, - iter: iter, + cmp: iter.cmp, + cfg: cfg, + iter: iter, + split: iter.cfg.Comparer.Split, + } + if cfg.SharedLowerUserKeyPrefix == nil { + r.keyShouldBeWrittenToShared = cfg.ConsiderCreateShared + } else if r.cmp(cfg.CompactionBounds.Start[:r.split(cfg.CompactionBounds.Start)], cfg.SharedLowerUserKeyPrefix) >= 0 { + // All keys in the compaction are in the shared key space. + r.keyShouldBeWrittenToShared = true + // No more need to do key comparisons. + r.cfg.SharedLowerUserKeyPrefix = nil + } else { + // Check if the compaction will only write non-shared keys. + endKeyPrefix := base.UserKeyBoundary{ + Key: cfg.CompactionBounds.End.Key[:r.split(cfg.CompactionBounds.End.Key)], + Kind: cfg.CompactionBounds.End.Kind, + } + // By taking the prefix, we turn an exclusive user-key bound into an inclusive user-key prefix bound. + if endKeyPrefix.Kind == base.Exclusive && len(endKeyPrefix.Key) < len(cfg.CompactionBounds.End.Key) { + endKeyPrefix.Kind = base.Inclusive + } + c := r.cmp(endKeyPrefix.Key, cfg.SharedLowerUserKeyPrefix) + if c < 0 || c == 0 && endKeyPrefix.Kind == base.Exclusive { + r.keyShouldBeWrittenToShared = false + // No more need to do key comparisons, since compaction only writes + // non-shared keys. + r.cfg.SharedLowerUserKeyPrefix = nil + } } r.key, r.value = r.iter.First() return r } // MoreDataToWrite returns true if there is more data to be written. -func (r *Runner) MoreDataToWrite() bool { +func (r *Runner) MoreDataToWrite() (moreData bool, keyShouldBeWrittenToShared bool) { if r.err != nil { - return false + return false, false + } + moreData = r.key != nil || !r.lastRangeDelSpan.Empty() || !r.lastRangeKeySpan.Empty() + if moreData && !r.keyShouldBeWrittenToShared && r.cfg.SharedLowerUserKeyPrefix != nil { + // May be stepping to a shared key. + firstKey := base.MinUserKey(r.cmp, spanStartOrNil(&r.lastRangeDelSpan), spanStartOrNil(&r.lastRangeKeySpan)) + if r.key != nil && firstKey == nil { + firstKey = r.key.UserKey + } + if firstKey == nil { + panic(base.AssertionFailedf("no data to write")) + } + firstKeyPrefix := firstKey[:r.split(firstKey)] + cmp := r.cmp(firstKeyPrefix, r.cfg.SharedLowerUserKeyPrefix) + if cmp >= 0 { + r.keyShouldBeWrittenToShared = true + // No more need to do key comparisons. + r.cfg.SharedLowerUserKeyPrefix = nil + } + // Else cmp < 0, i.e., firstKeyPrefix < SharedLowerUserKeyPrefix. So + // firstKey < SharedLowerUserKeyPrefix, and we can safely use the latter + // as the split-limit in writeKeysToTable below. } - return r.key != nil || !r.lastRangeDelSpan.Empty() || !r.lastRangeKeySpan.Empty() + return moreData, r.keyShouldBeWrittenToShared } // WriteTable writes a new output table. This table will be part of @@ -168,8 +228,13 @@ func (r *Runner) writeKeysToTable(tw *sstable.Writer) (splitKey []byte, _ error) if firstKey == nil { return nil, base.AssertionFailedf("no data to write") } + tableSplitLimit := r.TableSplitLimit(firstKey) + if r.cfg.SharedLowerUserKeyPrefix != nil && + (tableSplitLimit == nil || r.cmp(r.cfg.SharedLowerUserKeyPrefix, tableSplitLimit) < 0) { + tableSplitLimit = r.cfg.SharedLowerUserKeyPrefix + } splitter := NewOutputSplitter( - r.cmp, firstKey, r.TableSplitLimit(firstKey), + r.cmp, firstKey, tableSplitLimit, r.cfg.TargetOutputFileSize, r.cfg.Grandparents.Iter(), r.iter.Frontiers(), ) lastUserKeyFn := func() []byte { diff --git a/internal/compact/splitting.go b/internal/compact/splitting.go index e63564b48d..94fd4639c4 100644 --- a/internal/compact/splitting.go +++ b/internal/compact/splitting.go @@ -102,6 +102,8 @@ type OutputSplitter struct { shouldSplitCalled bool + pastStartKey bool + nextBoundary splitterBoundary // reachedBoundary is set when the frontier reaches a boundary and is cleared // in the first ShouldSplitBefore call after that. @@ -212,8 +214,18 @@ func (s *OutputSplitter) ShouldSplitBefore( panic("ShouldSplitBefore called after it returned SplitNow") } if !s.shouldSplitCalled { - // The boundary could have been advanced to nextUserKey before the splitter - // was created. So one single time, we advance the boundary manually. + // The boundary could have been advanced to nextUserKey before the + // splitter was created (the compact.Iter was at nextUserKey when a + // previous OutputSplitter decided to split-before). So one single time, + // we advance the boundary manually. + // + // Note that this first nextUserKey can be ahead of + // OutputSplitter.startKey, since the startKey is decided by the previous + // split key. For example, the preceding file was split at c, resulting in + // splitting of a rangedel [a,f) into [a,c) and [c,f) where [a,c) is + // included in the preceding file. The compact.Iter is at key e (which + // happens to be a point key). The startKey will be c, and nextUserKey + // will be e. We have the opportunity here to split at d. s.shouldSplitCalled = true for s.nextBoundary.key != nil && s.cmp(s.nextBoundary.key, nextUserKey) <= 0 { s.boundaryReached(nextUserKey) @@ -250,9 +262,10 @@ func (s *OutputSplitter) ShouldSplitBefore( // When the target file size limit is very small (in tests), we could end up // splitting at the first key, which is not allowed. - if s.cmp(nextUserKey, s.startKey) <= 0 { + if !s.pastStartKey && s.cmp(nextUserKey, s.startKey) <= 0 { return NoSplit } + s.pastStartKey = true // TODO(radu): it would make for a cleaner interface if we didn't rely on a // lastUserKeyFn. We could make a copy of the key here and split at the next diff --git a/options.go b/options.go index 545d561e08..13f264135a 100644 --- a/options.go +++ b/options.go @@ -697,6 +697,18 @@ type Options struct { // CreateOnSharedLocator). CreateOnShared remote.CreateOnSharedStrategy CreateOnSharedLocator remote.Locator + // SharedLowerUserKeyPrefix, if specified, is an additional lower bound + // constraint on key prefixes that should be written to shared files. + SharedLowerUserKeyPrefix []byte + // WriteSharedWithStrictObsolete specifies that shared sstables are + // written with WriterOptions.IsStrictObsolete set to true. Strict + // obsolete tables do not permit merge keys. + WriteSharedWithStrictObsolete bool + // TODO(sumeer): add ReadSharedRequiresStrictObsolete to require that + // shared files visited via ScanInternal parameter func(sst + // *SharedSSTMeta) must be strict obsolete. That visit only has access to + // FileMetadata, so we will need to encode the StrictObsolete bit in + // there. // CacheSizeBytesBytes is the size of the on-disk block cache for objects // on shared storage in bytes. If it is 0, no cache is used.