Skip to content

Commit

Permalink
db: add SharedLowerUserKeyPrefix and WriteSharedWithStrictObsolete op…
Browse files Browse the repository at this point in the history
…tions

SharedLowerUserKeyPrefix, if specified, is an additional lower bound on
constraint on key prefixes that should be written to shared files. It
applies only when CreateOnShared permits some shared file creation. It
will be used by CockroachDB to exclude keys below TableDataMin from
shared files, for both correctness (they can contain MERGEs for which
the obsolete bit does not work) and performance reasons (low latency
is more important and the data volume is tiny).

WriteSharedWithStrictObsolete, when true, causes shared files to be
written with WriterOptions.IsStrictObsolete set to true. This adds
an extra measure of configuration protection to accidentally sharing
files where the MERGE could become visible (we currently share such
files, but file virtualization hides these MERGEs).

The enforcement of SharedLowerUserKeyPrefix changes how
PreferSharedStorage is computed during flushes and compactions. It
will only be set if the next key to be written by compact.Runner
permits writing to shared storage. compact.Runner optimizes this
computation for when a compaction is fully within the shared or
unshared bounds. Additionally compact.Runner uses the existing
OutputSplitter to decide when a split should happen when transitioning
from unshared to shared.

While here, we do a tiny optimization in OutputSplitter to remove
a key comparison on each iteration key.

Fixes cockroachdb#2756
  • Loading branch information
sumeerbhola committed May 15, 2024
1 parent 18bbac5 commit dc8b5b2
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 14 deletions.
20 changes: 16 additions & 4 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2518,14 +2518,23 @@ func (d *DB) compactAndWrite(
MaxGrandparentOverlapBytes: c.maxOverlapBytes,
TargetOutputFileSize: c.maxOutputFileSize,
}
considerCreateShared := remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level)
if considerCreateShared {
runnerCfg.ConsiderCreateShared = true
runnerCfg.SharedLowerUserKeyPrefix = d.opts.Experimental.SharedLowerUserKeyPrefix
}
runner := compact.NewRunner(runnerCfg, iter)
for runner.MoreDataToWrite() {
for {
moreData, keyShouldBeWrittenToShared := runner.MoreDataToWrite()
if !moreData {
break
}
if c.cancel.Load() {
return runner.Finish().WithError(ErrCancelledCompaction)
}
// Create a new table.
writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat)
objMeta, tw, cpuWorkHandle, err := d.newCompactionOutput(jobID, c, writerOpts)
objMeta, tw, cpuWorkHandle, err := d.newCompactionOutput(jobID, c, writerOpts, keyShouldBeWrittenToShared)
if err != nil {
return runner.Finish().WithError(err)
}
Expand Down Expand Up @@ -2652,7 +2661,7 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error
// newCompactionOutput creates an object for a new table produced by a
// compaction or flush.
func (d *DB) newCompactionOutput(
jobID JobID, c *compaction, writerOpts sstable.WriterOptions,
jobID JobID, c *compaction, writerOpts sstable.WriterOptions, preferSharedStorage bool,
) (objstorage.ObjectMetadata, *sstable.Writer, CPUWorkHandle, error) {
d.mu.Lock()
diskFileNum := d.mu.versions.getNextDiskFileNum()
Expand Down Expand Up @@ -2689,7 +2698,7 @@ func (d *DB) newCompactionOutput(

// Prefer shared storage if present.
createOpts := objstorage.CreateOptions{
PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
PreferSharedStorage: preferSharedStorage,
WriteCategory: writeCategory,
}
writable, objMeta, err := d.objProvider.Create(ctx, fileTypeTable, diskFileNum, createOpts)
Expand Down Expand Up @@ -2721,6 +2730,9 @@ func (d *DB) newCompactionOutput(
d.opts.Experimental.MaxWriterConcurrency > 0 &&
(cpuWorkHandle.Permitted() || d.opts.Experimental.ForceWriterParallelism)

if d.opts.Experimental.WriteSharedWithStrictObsolete && objMeta.IsShared() {
writerOpts.IsStrictObsolete = true
}
tw := sstable.NewWriter(writable, writerOpts, cacheOpts)
return objMeta, tw, cpuWorkHandle, nil
}
Expand Down
11 changes: 11 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,17 @@ func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
// creator ID was set (as creator IDs are necessary to enable shared storage)
// resulting in some lower level SSTs being on non-shared storage. Skip-shared
// iteration is invalid in those cases.
//
// The above error handling implies that ScanInternal with a non-nil
// VisitSharedFile can only be called without error if both the following
// conditions are true:
//
// - All files in the LSM conform to remote.CreateOnSharedLower strategy (they
// can of course conform to the stronger remote.CreateOnSharedAll).
//
// - If Options.Experimental.SharedLowerUserKeyPrefix is non-nil, the lower
// parameter must have a prefix that is greater than or equal to this lower
// bound.
func (d *DB) ScanInternal(
ctx context.Context,
categoryAndQoS sstable.CategoryAndQoS,
Expand Down
75 changes: 68 additions & 7 deletions internal/compact/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ type RunnerConfig struct {
// during compaction. In practice, the sizes can vary between 50%-200% of this
// value.
TargetOutputFileSize uint64

// ConsiderCreateShared is true if this compaction can write shared files if
// SharedLowerUserKeyPrefix allows.
ConsiderCreateShared bool

// Set equal to Options.Experimental.SharedLowerUserKeyPrefix, when
// ConsiderCreateShared is true, else nil.
SharedLowerUserKeyPrefix []byte
}

// Runner is a helper for running the "data" part of a compaction (where we use
Expand All @@ -97,6 +105,11 @@ type Runner struct {
cfg RunnerConfig
iter *Iter

split base.Split
// At most one state transition from false => true, when the next key should
// be written to shared storage.
keyShouldBeWrittenToShared bool

tables []OutputTable
// Stores any error encountered.
err error
Expand All @@ -113,20 +126,63 @@ type Runner struct {
// NewRunner creates a new Runner.
func NewRunner(cfg RunnerConfig, iter *Iter) *Runner {
r := &Runner{
cmp: iter.cmp,
cfg: cfg,
iter: iter,
cmp: iter.cmp,
cfg: cfg,
iter: iter,
split: iter.cfg.Comparer.Split,
}
if cfg.SharedLowerUserKeyPrefix == nil {
r.keyShouldBeWrittenToShared = cfg.ConsiderCreateShared
} else if r.cmp(cfg.CompactionBounds.Start[:r.split(cfg.CompactionBounds.Start)], cfg.SharedLowerUserKeyPrefix) >= 0 {
r.keyShouldBeWrittenToShared = true
// No more need to do key comparisons, since compaction can always write to shared.
r.cfg.SharedLowerUserKeyPrefix = nil
} else {
// Is the compaction always writing non-shared keys.
endKeyPrefix := base.UserKeyBoundary{
Key: cfg.CompactionBounds.End.Key[:r.split(cfg.CompactionBounds.End.Key)],
Kind: cfg.CompactionBounds.End.Kind,
}
// By taking the prefix, we can turn an exclusive user-key bound into an inclusive user-key prefix bound.
if endKeyPrefix.Kind == base.Exclusive && len(endKeyPrefix.Key) < len(cfg.CompactionBounds.End.Key) {
endKeyPrefix.Kind = base.Inclusive
}
c := r.cmp(endKeyPrefix.Key, cfg.SharedLowerUserKeyPrefix)
if c < 0 || c == 0 && endKeyPrefix.Kind == base.Exclusive {
r.keyShouldBeWrittenToShared = false
// No more need to do key comparisons, since compaction can never write to shared.
r.cfg.SharedLowerUserKeyPrefix = nil
}
}
r.key, r.value = r.iter.First()
return r
}

// MoreDataToWrite returns true if there is more data to be written.
func (r *Runner) MoreDataToWrite() bool {
func (r *Runner) MoreDataToWrite() (moreData bool, keyShouldBeWrittenToShared bool) {
if r.err != nil {
return false
return false, false
}
moreData = r.key != nil || !r.lastRangeDelSpan.Empty() || !r.lastRangeKeySpan.Empty()
if moreData && !r.keyShouldBeWrittenToShared && r.cfg.SharedLowerUserKeyPrefix != nil {
firstKey := base.MinUserKey(r.cmp, spanStartOrNil(&r.lastRangeDelSpan), spanStartOrNil(&r.lastRangeKeySpan))
if r.key != nil && firstKey == nil {
firstKey = r.key.UserKey
}
if firstKey == nil {
panic(base.AssertionFailedf("no data to write"))
}
cmp := r.cmp(r.cfg.SharedLowerUserKeyPrefix, firstKey[:r.split(firstKey)])
if cmp <= 0 {
r.keyShouldBeWrittenToShared = true
// No more need to do key comparisons to stop writing to current sstable.
r.cfg.SharedLowerUserKeyPrefix = nil
}
// Else cmp > 0, so r.cmp(r.cfg.SharedLowerUserKeyPrefix, firstKey) > 0,
// and we can safely use the former as the split-limit in writeKeysToTable
// below.
}
return r.key != nil || !r.lastRangeDelSpan.Empty() || !r.lastRangeKeySpan.Empty()
return moreData, r.keyShouldBeWrittenToShared
}

// WriteTable writes a new output table. This table will be part of
Expand Down Expand Up @@ -168,8 +224,13 @@ func (r *Runner) writeKeysToTable(tw *sstable.Writer) (splitKey []byte, _ error)
if firstKey == nil {
return nil, base.AssertionFailedf("no data to write")
}
tableSplitLimit := r.TableSplitLimit(firstKey)
if r.cfg.SharedLowerUserKeyPrefix != nil &&
(tableSplitLimit == nil || r.cmp(r.cfg.SharedLowerUserKeyPrefix, tableSplitLimit) < 0) {
tableSplitLimit = r.cfg.SharedLowerUserKeyPrefix
}
splitter := NewOutputSplitter(
r.cmp, firstKey, r.TableSplitLimit(firstKey),
r.cmp, firstKey, tableSplitLimit,
r.cfg.TargetOutputFileSize, r.cfg.Grandparents.Iter(), r.iter.Frontiers(),
)
lastUserKeyFn := func() []byte {
Expand Down
19 changes: 16 additions & 3 deletions internal/compact/splitting.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ type OutputSplitter struct {

shouldSplitCalled bool

pastStartKey bool

nextBoundary splitterBoundary
// reachedBoundary is set when the frontier reaches a boundary and is cleared
// in the first ShouldSplitBefore call after that.
Expand Down Expand Up @@ -212,8 +214,18 @@ func (s *OutputSplitter) ShouldSplitBefore(
panic("ShouldSplitBefore called after it returned SplitNow")
}
if !s.shouldSplitCalled {
// The boundary could have been advanced to nextUserKey before the splitter
// was created. So one single time, we advance the boundary manually.
// The boundary could have been advanced to nextUserKey before the
// splitter was created (the compact.Iter was at nextUserKey when a
// previous OutputSplitter decided to split-before). So one single time,
// we advance the boundary manually.
//
// Note that this first nextUserKey can be ahead of
// OutputSplitter.startKey, since the startKey is decided by the previous
// split key. For example, the preceding file was split at c, resulting in
// splitting of a rangedel [a,f) into [a,c) and [c,f) where [a,c) is
// included in the preceding file. The compact.Iter is at key e (which
// happens to be a point key). The startKey will be c, and nextUserKey
// will be e. We have the opportunity here to split at d.
s.shouldSplitCalled = true
for s.nextBoundary.key != nil && s.cmp(s.nextBoundary.key, nextUserKey) <= 0 {
s.boundaryReached(nextUserKey)
Expand Down Expand Up @@ -250,9 +262,10 @@ func (s *OutputSplitter) ShouldSplitBefore(

// When the target file size limit is very small (in tests), we could end up
// splitting at the first key, which is not allowed.
if s.cmp(nextUserKey, s.startKey) <= 0 {
if !s.pastStartKey && s.cmp(nextUserKey, s.startKey) <= 0 {
return NoSplit
}
s.pastStartKey = true

// TODO(radu): it would make for a cleaner interface if we didn't rely on a
// lastUserKeyFn. We could make a copy of the key here and split at the next
Expand Down
12 changes: 12 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,18 @@ type Options struct {
// CreateOnSharedLocator).
CreateOnShared remote.CreateOnSharedStrategy
CreateOnSharedLocator remote.Locator
// SharedLowerUserKeyPrefix, if specified, is an additional lower bound
// constraint on key prefixes that should be written to shared files.
SharedLowerUserKeyPrefix []byte
// WriteSharedWithStrictObsolete specifies that shared sstables are
// written with WriterOptions.IsStrictObsolete set to true. Strict
// obsolete tables do not permit merge keys.
WriteSharedWithStrictObsolete bool
// TODO(sumeer): add ReadSharedRequiresStrictObsolete to require that
// shared files visited via ScanInternal parameter func(sst
// *SharedSSTMeta) must be strict obsolete. That visit only has access to
// FileMetadata, so we will need to encode the StrictObsolete bit in
// there.

// CacheSizeBytesBytes is the size of the on-disk block cache for objects
// on shared storage in bytes. If it is 0, no cache is used.
Expand Down

0 comments on commit dc8b5b2

Please sign in to comment.