forked from cockroachdb/pebble
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscan_internal.go
996 lines (922 loc) · 34.3 KB
/
scan_internal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.
package pebble
import (
"context"
"fmt"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/objstorage"
)
const (
// In skip-shared iteration mode, keys in levels sharedLevelsStart and greater
// (i.e. lower in the LSM) are skipped.
sharedLevelsStart = 5
)
// ErrInvalidSkipSharedIteration is returned by ScanInternal if it was called
// with a shared file visitor function, and a file in a shareable level (i.e.
// level >= sharedLevelsStart) was found to not be in shared storage according
// to objstorage.Provider, or not shareable for another reason such as for
// containing keys newer than the snapshot sequence number.
var ErrInvalidSkipSharedIteration = errors.New("pebble: cannot use skip-shared iteration due to non-shareable files in lower levels")
// SharedSSTMeta represents an sstable on shared storage that can be ingested
// by another pebble instance. This struct must contain all fields that are
// required for a Pebble instance to ingest a foreign sstable on shared storage,
// including constructing any relevant objstorage.Provider / remoteobjcat.Catalog
// data structures, as well as creating virtual FileMetadatas.
//
// Note that the Pebble instance creating and returning a SharedSSTMeta might
// not be the one that created the underlying sstable on shared storage to begin
// with; it's possible for a Pebble instance to reshare an sstable that was
// shared to it.
type SharedSSTMeta struct {
// Backing is the shared object underlying this SST. Can be attached to an
// objstorage.Provider.
Backing objstorage.RemoteObjectBackingHandle
// Smallest and Largest internal keys for the overall bounds. The kind and
// SeqNum of these will reflect what is physically present on the source Pebble
// instance's view of the sstable; it's up to the ingesting instance to set the
// sequence number in the trailer to match the read-time sequence numbers
// reserved for the level this SST is being ingested into. The Kind is expected
// to remain unchanged by the ingesting instance.
//
// Note that these bounds could be narrower than the bounds of the underlying
// sstable; ScanInternal is expected to truncate sstable bounds to the user key
// bounds passed into that method.
Smallest, Largest InternalKey
// SmallestRangeKey and LargestRangeKey are internal keys that denote the
// range key bounds of this sstable. Must lie within [Smallest, Largest].
SmallestRangeKey, LargestRangeKey InternalKey
// SmallestPointKey and LargestPointKey are internal keys that denote the
// point key bounds of this sstable. Must lie within [Smallest, Largest].
SmallestPointKey, LargestPointKey InternalKey
// Level denotes the level at which this file was present at read time.
// For files visited by ScanInternal, this value will only be 5 or 6.
Level uint8
// Size contains an estimate of the size of this sstable.
Size uint64
// fileNum at time of creation in the creator instance. Only used for
// debugging/tests.
fileNum base.FileNum
}
func (s *SharedSSTMeta) cloneFromFileMeta(f *fileMetadata) {
*s = SharedSSTMeta{
Smallest: f.Smallest.Clone(),
Largest: f.Largest.Clone(),
SmallestRangeKey: f.SmallestRangeKey.Clone(),
LargestRangeKey: f.LargestRangeKey.Clone(),
SmallestPointKey: f.SmallestPointKey.Clone(),
LargestPointKey: f.LargestPointKey.Clone(),
Size: f.Size,
fileNum: f.FileNum,
}
}
type sharedByLevel []SharedSSTMeta
func (s sharedByLevel) Len() int { return len(s) }
func (s sharedByLevel) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s sharedByLevel) Less(i, j int) bool { return s[i].Level < s[j].Level }
type pcIterPos int
const (
pcIterPosCur pcIterPos = iota
pcIterPosNext
)
// pointCollapsingIterator is an internalIterator that collapses point keys and
// returns at most one point internal key for each user key. Merges and
// SingleDels are not supported and result in a panic if encountered. Point keys
// deleted by rangedels are considered shadowed and not exposed.
//
// Only used in ScanInternal to return at most one internal key per user key.
type pointCollapsingIterator struct {
iter keyspan.InterleavingIter
pos pcIterPos
comparer *base.Comparer
merge base.Merge
err error
seqNum uint64
// The current position of `iter`. Always owned by the underlying iter.
iterKey *InternalKey
// The last saved key. findNextEntry and similar methods are expected to save
// the current value of iterKey to savedKey if they're iterating away from the
// current key but still need to retain it. See comments in findNextEntry on
// how this field is used.
//
// At the end of a positioning call:
// - if pos == pcIterPosNext, iterKey is pointing to the next user key owned
// by `iter` while savedKey is holding a copy to our current key.
// - If pos == pcIterPosCur, iterKey is pointing to an `iter`-owned current
// key, and savedKey is either undefined or pointing to a version of the
// current key owned by this iterator (i.e. backed by savedKeyBuf).
savedKey InternalKey
savedKeyBuf []byte
// Value at the current iterator position, at iterKey.
iterValue base.LazyValue
// If fixedSeqNum is non-zero, all emitted points are verified to have this
// fixed sequence number.
fixedSeqNum uint64
}
func (p *pointCollapsingIterator) Span() *keyspan.Span {
return p.iter.Span()
}
// SeekPrefixGE implements the InternalIterator interface.
func (p *pointCollapsingIterator) SeekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
p.resetKey()
p.iterKey, p.iterValue = p.iter.SeekPrefixGE(prefix, key, flags)
p.pos = pcIterPosCur
if p.iterKey == nil {
return nil, base.LazyValue{}
}
return p.findNextEntry()
}
// SeekGE implements the InternalIterator interface.
func (p *pointCollapsingIterator) SeekGE(
key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
p.resetKey()
p.iterKey, p.iterValue = p.iter.SeekGE(key, flags)
p.pos = pcIterPosCur
if p.iterKey == nil {
return nil, base.LazyValue{}
}
return p.findNextEntry()
}
// SeekLT implements the InternalIterator interface.
func (p *pointCollapsingIterator) SeekLT(
key []byte, flags base.SeekLTFlags,
) (*base.InternalKey, base.LazyValue) {
panic("unimplemented")
}
func (p *pointCollapsingIterator) resetKey() {
p.savedKey.UserKey = p.savedKeyBuf[:0]
p.savedKey.Trailer = 0
p.iterKey = nil
p.pos = pcIterPosCur
}
func (p *pointCollapsingIterator) verifySeqNum(key *base.InternalKey) *base.InternalKey {
if !invariants.Enabled {
return key
}
if p.fixedSeqNum == 0 || key == nil || key.Kind() == InternalKeyKindRangeDelete {
return key
}
if key.SeqNum() != p.fixedSeqNum {
panic(fmt.Sprintf("expected foreign point key to have seqnum %d, got %d", p.fixedSeqNum, key.SeqNum()))
}
return key
}
// findNextEntry is called to return the next key. p.iter must be positioned at the
// start of the first user key we are interested in.
func (p *pointCollapsingIterator) findNextEntry() (*base.InternalKey, base.LazyValue) {
p.saveKey()
// Saves a comparison in the fast path
firstIteration := true
for p.iterKey != nil {
// NB: p.savedKey is either the current key (iff p.iterKey == firstKey),
// or the previous key.
if !firstIteration && !p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) {
p.saveKey()
continue
}
firstIteration = false
if s := p.iter.Span(); s != nil && s.CoversAt(p.seqNum, p.iterKey.SeqNum()) {
// All future keys for this user key must be deleted.
if p.savedKey.Kind() == InternalKeyKindSingleDelete {
panic("cannot process singledel key in point collapsing iterator")
}
// Fast forward to the next user key.
p.saveKey()
p.iterKey, p.iterValue = p.iter.Next()
for p.iterKey != nil && p.savedKey.SeqNum() >= p.iterKey.SeqNum() && p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) {
p.iterKey, p.iterValue = p.iter.Next()
}
continue
}
switch p.savedKey.Kind() {
case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
// Note that we return SETs directly, even if they would otherwise get
// compacted into a Del to turn into a SetWithDelete. This is a fast
// path optimization that can break SINGLEDEL determinism. To lead to
// consistent SINGLEDEL behaviour, this iterator should *not* be used for
// a keyspace where SINGLEDELs could be in use. If this iterator observes
// a SINGLEDEL as the first internal key for a user key, it will panic.
//
// As p.value is a lazy value owned by the child iterator, we can thread
// it through without loading it into p.valueBuf.
//
// TODO(bilal): We can even avoid saving the key in this fast path if
// we are in a block where setHasSamePrefix = false in a v3 sstable,
// guaranteeing that there's only one internal key for each user key.
// Thread this logic through the sstable iterators and/or consider
// collapsing (ha) this logic into the sstable iterators that are aware
// of blocks and can determine user key changes without doing key saves
// or comparisons.
p.pos = pcIterPosCur
return p.verifySeqNum(p.iterKey), p.iterValue
case InternalKeyKindSingleDelete:
// Panic, as this iterator is not expected to observe single deletes.
panic("cannot process singledel key in point collapsing iterator")
case InternalKeyKindMerge:
// Panic, as this iterator is not expected to observe merges.
panic("cannot process merge key in point collapsing iterator")
case InternalKeyKindRangeDelete:
// These are interleaved by the interleaving iterator ahead of all points.
// We should pass them as-is, but also account for any points ahead of
// them.
p.pos = pcIterPosCur
return p.verifySeqNum(p.iterKey), p.iterValue
default:
panic(fmt.Sprintf("unexpected kind: %d", p.iterKey.Kind()))
}
}
p.resetKey()
return nil, base.LazyValue{}
}
// First implements the InternalIterator interface.
func (p *pointCollapsingIterator) First() (*base.InternalKey, base.LazyValue) {
p.resetKey()
p.iterKey, p.iterValue = p.iter.First()
p.pos = pcIterPosCur
if p.iterKey == nil {
return nil, base.LazyValue{}
}
return p.findNextEntry()
}
// Last implements the InternalIterator interface.
func (p *pointCollapsingIterator) Last() (*base.InternalKey, base.LazyValue) {
panic("unimplemented")
}
func (p *pointCollapsingIterator) saveKey() {
if p.iterKey == nil {
p.savedKey = InternalKey{UserKey: p.savedKeyBuf[:0]}
return
}
p.savedKeyBuf = append(p.savedKeyBuf[:0], p.iterKey.UserKey...)
p.savedKey = InternalKey{UserKey: p.savedKeyBuf, Trailer: p.iterKey.Trailer}
}
// Next implements the InternalIterator interface.
func (p *pointCollapsingIterator) Next() (*base.InternalKey, base.LazyValue) {
switch p.pos {
case pcIterPosCur:
p.saveKey()
if p.iterKey != nil && p.iterKey.Kind() == InternalKeyKindRangeDelete {
// Step over the interleaved range delete and process the very next
// internal key, even if it's at the same user key. This is because a
// point for that user key has not been returned yet.
p.iterKey, p.iterValue = p.iter.Next()
break
}
// Fast forward to the next user key.
key, val := p.iter.Next()
// p.iterKey.SeqNum() >= key.SeqNum() is an optimization that allows us to
// use p.iterKey.SeqNum() < key.SeqNum() as a sign that the user key has
// changed, without needing to do the full key comparison.
for key != nil && p.savedKey.SeqNum() >= key.SeqNum() &&
p.comparer.Equal(p.savedKey.UserKey, key.UserKey) {
key, val = p.iter.Next()
}
if key == nil {
// There are no keys to return.
p.resetKey()
return nil, base.LazyValue{}
}
p.iterKey, p.iterValue = key, val
case pcIterPosNext:
p.pos = pcIterPosCur
}
if p.iterKey == nil {
p.resetKey()
return nil, base.LazyValue{}
}
return p.findNextEntry()
}
// NextPrefix implements the InternalIterator interface.
func (p *pointCollapsingIterator) NextPrefix(succKey []byte) (*base.InternalKey, base.LazyValue) {
panic("unimplemented")
}
// Prev implements the InternalIterator interface.
func (p *pointCollapsingIterator) Prev() (*base.InternalKey, base.LazyValue) {
panic("unimplemented")
}
// Error implements the InternalIterator interface.
func (p *pointCollapsingIterator) Error() error {
if p.err != nil {
return p.err
}
return p.iter.Error()
}
// Close implements the InternalIterator interface.
func (p *pointCollapsingIterator) Close() error {
return p.iter.Close()
}
// SetBounds implements the InternalIterator interface.
func (p *pointCollapsingIterator) SetBounds(lower, upper []byte) {
p.resetKey()
p.iter.SetBounds(lower, upper)
}
// String implements the InternalIterator interface.
func (p *pointCollapsingIterator) String() string {
return p.iter.String()
}
var _ internalIterator = &pointCollapsingIterator{}
// IteratorLevelKind is used to denote whether the current ScanInternal iterator
// is unknown, belongs to a flushable, or belongs to an LSM level type.
type IteratorLevelKind int8
const (
// IteratorLevelUnknown indicates an unknown LSM level.
IteratorLevelUnknown IteratorLevelKind = iota
// IteratorLevelLSM indicates an LSM level.
IteratorLevelLSM
// IteratorLevelFlushable indicates a flushable (i.e. memtable).
IteratorLevelFlushable
)
// IteratorLevel is used with scanInternalIterator to surface additional iterator-specific info where possible.
// Note: this is struct is only provided for point keys.
type IteratorLevel struct {
Kind IteratorLevelKind
// FlushableIndex indicates the position within the flushable queue of this level.
// Only valid if kind == IteratorLevelFlushable.
FlushableIndex int
// The level within the LSM. Only valid if Kind == IteratorLevelLSM.
Level int
// Sublevel is only valid if Kind == IteratorLevelLSM and Level == 0.
Sublevel int
}
// scanInternalIterator is an iterator that returns all internal keys, including
// tombstones. For instance, an InternalKeyKindDelete would be returned as an
// InternalKeyKindDelete instead of the iterator skipping over to the next key.
// Internal keys within a user key are collapsed, eg. if there are two SETs, the
// one with the higher sequence is returned. Useful if an external user of Pebble
// needs to observe and rebuild Pebble's history of internal keys, such as in
// node-to-node replication. For use with {db,snapshot}.ScanInternal().
//
// scanInternalIterator is expected to ignore point keys deleted by range
// deletions, and range keys shadowed by a range key unset or delete, however it
// *must* return the range delete as well as the range key unset/delete that did
// the shadowing.
type scanInternalIterator struct {
db *DB
opts scanInternalOptions
comparer *base.Comparer
merge Merge
iter internalIterator
readState *readState
version *version
rangeKey *iteratorRangeKeyState
pointKeyIter internalIterator
iterKey *InternalKey
iterValue LazyValue
alloc *iterAlloc
newIters tableNewIters
newIterRangeKey keyspan.TableNewSpanIter
seqNum uint64
iterLevels []IteratorLevel
mergingIter *mergingIter
// boundsBuf holds two buffers used to store the lower and upper bounds.
// Whenever the InternalIterator's bounds change, the new bounds are copied
// into boundsBuf[boundsBufIdx]. The two bounds share a slice to reduce
// allocations. opts.LowerBound and opts.UpperBound point into this slice.
boundsBuf [2][]byte
boundsBufIdx int
}
// truncateSharedFile truncates a shared file's [Smallest, Largest] fields to
// [lower, upper), potentially opening iterators on the file to find keys within
// the requested bounds. A SharedSSTMeta is produced that is suitable for
// external consumption by other Pebble instances. If shouldSkip is true, this
// file does not contain any keys in [lower, upper) and can be skipped.
//
// TODO(bilal): If opening iterators and doing reads in this method is too
// inefficient, consider producing non-tight file bounds instead.
func (d *DB) truncateSharedFile(
ctx context.Context,
lower, upper []byte,
level int,
file *fileMetadata,
objMeta objstorage.ObjectMetadata,
) (sst *SharedSSTMeta, shouldSkip bool, err error) {
cmp := d.cmp
sst = &SharedSSTMeta{}
sst.cloneFromFileMeta(file)
sst.Level = uint8(level)
sst.Backing, err = d.objProvider.RemoteObjectBacking(&objMeta)
if err != nil {
return nil, false, err
}
needsLowerTruncate := cmp(lower, file.Smallest.UserKey) > 0
needsUpperTruncate := cmp(upper, file.Largest.UserKey) < 0 || (cmp(upper, file.Largest.UserKey) == 0 && !file.Largest.IsExclusiveSentinel())
// Fast path: file is entirely within [lower, upper).
if !needsLowerTruncate && !needsUpperTruncate {
return sst, false, nil
}
// We will need to truncate file bounds in at least one direction. Open all
// relevant iterators.
iter, rangeDelIter, err := d.newIters(ctx, file, &IterOptions{
LowerBound: lower,
UpperBound: upper,
level: manifest.Level(level),
}, internalIterOpts{})
if err != nil {
return nil, false, err
}
defer iter.Close()
if rangeDelIter != nil {
rangeDelIter = keyspan.Truncate(
cmp, rangeDelIter, lower, upper, nil, nil,
false, /* panicOnUpperTruncate */
)
defer rangeDelIter.Close()
}
rangeKeyIter, err := d.tableNewRangeKeyIter(file, keyspan.SpanIterOptions{})
if err != nil {
return nil, false, err
}
if rangeKeyIter != nil {
rangeKeyIter = keyspan.Truncate(
cmp, rangeKeyIter, lower, upper, nil, nil,
false, /* panicOnUpperTruncate */
)
defer rangeKeyIter.Close()
}
// Check if we need to truncate on the left side. This means finding a new
// LargestPointKey and LargestRangeKey that is >= lower.
if needsLowerTruncate {
sst.SmallestPointKey.UserKey = sst.SmallestPointKey.UserKey[:0]
sst.SmallestPointKey.Trailer = 0
key, _ := iter.SeekGE(lower, base.SeekGEFlagsNone)
foundPointKey := key != nil
if key != nil {
sst.SmallestPointKey.CopyFrom(*key)
}
if rangeDelIter != nil {
span := rangeDelIter.SeekGE(lower)
if span != nil && (len(sst.SmallestPointKey.UserKey) == 0 || base.InternalCompare(cmp, span.SmallestKey(), sst.SmallestPointKey) < 0) {
sst.SmallestPointKey.CopyFrom(span.SmallestKey())
foundPointKey = true
}
}
if !foundPointKey {
// There are no point keys in the span we're interested in.
sst.SmallestPointKey = InternalKey{}
sst.LargestPointKey = InternalKey{}
}
sst.SmallestRangeKey.UserKey = sst.SmallestRangeKey.UserKey[:0]
sst.SmallestRangeKey.Trailer = 0
if rangeKeyIter != nil {
span := rangeKeyIter.SeekGE(lower)
if span != nil {
sst.SmallestRangeKey.CopyFrom(span.SmallestKey())
} else {
// There are no range keys in the span we're interested in.
sst.SmallestRangeKey = InternalKey{}
sst.LargestRangeKey = InternalKey{}
}
}
}
// Check if we need to truncate on the right side. This means finding a new
// LargestPointKey and LargestRangeKey that is < upper.
if needsUpperTruncate {
sst.LargestPointKey.UserKey = sst.LargestPointKey.UserKey[:0]
sst.LargestPointKey.Trailer = 0
key, _ := iter.SeekLT(upper, base.SeekLTFlagsNone)
foundPointKey := key != nil
if key != nil {
sst.LargestPointKey.CopyFrom(*key)
}
if rangeDelIter != nil {
span := rangeDelIter.SeekLT(upper)
if span != nil && (len(sst.LargestPointKey.UserKey) == 0 || base.InternalCompare(cmp, span.LargestKey(), sst.LargestPointKey) > 0) {
sst.LargestPointKey.CopyFrom(span.LargestKey())
foundPointKey = true
}
}
if !foundPointKey {
// There are no point keys in the span we're interested in.
sst.SmallestPointKey = InternalKey{}
sst.LargestPointKey = InternalKey{}
}
sst.LargestRangeKey.UserKey = sst.LargestRangeKey.UserKey[:0]
sst.LargestRangeKey.Trailer = 0
if rangeKeyIter != nil {
span := rangeKeyIter.SeekLT(upper)
if span != nil {
sst.LargestRangeKey.CopyFrom(span.LargestKey())
} else {
// There are no range keys in the span we're interested in.
sst.SmallestRangeKey = InternalKey{}
sst.LargestRangeKey = InternalKey{}
}
}
}
// Set overall bounds based on {Smallest,Largest}{Point,Range}Key.
switch {
case len(sst.SmallestRangeKey.UserKey) == 0:
sst.Smallest = sst.SmallestPointKey
case len(sst.SmallestPointKey.UserKey) == 0:
sst.Smallest = sst.SmallestRangeKey
default:
sst.Smallest = sst.SmallestPointKey
if base.InternalCompare(cmp, sst.SmallestRangeKey, sst.SmallestPointKey) < 0 {
sst.Smallest = sst.SmallestRangeKey
}
}
switch {
case len(sst.LargestRangeKey.UserKey) == 0:
sst.Largest = sst.LargestPointKey
case len(sst.LargestPointKey.UserKey) == 0:
sst.Largest = sst.LargestRangeKey
default:
sst.Largest = sst.LargestPointKey
if base.InternalCompare(cmp, sst.LargestRangeKey, sst.LargestPointKey) > 0 {
sst.Largest = sst.LargestRangeKey
}
}
// On rare occasion, a file might overlap with [lower, upper) but not actually
// have any keys within those bounds. Skip such files.
if len(sst.Smallest.UserKey) == 0 {
return nil, true, nil
}
sst.Size, err = d.tableCache.estimateSize(file, sst.Smallest.UserKey, sst.Largest.UserKey)
if err != nil {
return nil, false, err
}
// On occasion, estimateSize gives us a low estimate, i.e. a 0 file size. This
// can cause panics in places where we divide by file sizes. Correct for it
// here.
if sst.Size == 0 {
sst.Size = 1
}
return sst, false, nil
}
func scanInternalImpl(
ctx context.Context, lower, upper []byte, iter *scanInternalIterator, opts *scanInternalOptions,
) error {
if opts.visitSharedFile != nil && (lower == nil || upper == nil) {
panic("lower and upper bounds must be specified in skip-shared iteration mode")
}
// Before starting iteration, check if any files in levels sharedLevelsStart
// and below are *not* shared. Error out if that is the case, as skip-shared
// iteration will not produce a consistent point-in-time view of this range
// of keys. For files that are shared, call visitSharedFile with a truncated
// version of that file.
cmp := iter.comparer.Compare
provider := iter.db.ObjProvider()
seqNum := iter.seqNum
current := iter.version
if current == nil {
current = iter.readState.current
}
if opts.visitSharedFile != nil {
if provider == nil {
panic("expected non-nil Provider in skip-shared iteration mode")
}
for level := sharedLevelsStart; level < numLevels; level++ {
files := current.Levels[level].Iter()
for f := files.SeekGE(cmp, lower); f != nil && cmp(f.Smallest.UserKey, upper) < 0; f = files.Next() {
var objMeta objstorage.ObjectMetadata
var err error
objMeta, err = provider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
if err != nil {
return err
}
if !objMeta.IsShared() {
return errors.Wrapf(ErrInvalidSkipSharedIteration, "file %s is not shared", objMeta.DiskFileNum)
}
if !base.Visible(f.LargestSeqNum, seqNum, base.InternalKeySeqNumMax) {
return errors.Wrapf(ErrInvalidSkipSharedIteration, "file %s contains keys newer than snapshot", objMeta.DiskFileNum)
}
var sst *SharedSSTMeta
var skip bool
sst, skip, err = iter.db.truncateSharedFile(ctx, lower, upper, level, f, objMeta)
if err != nil {
return err
}
if skip {
continue
}
if err = opts.visitSharedFile(sst); err != nil {
return err
}
}
}
}
for valid := iter.seekGE(lower); valid && iter.error() == nil; valid = iter.next() {
key := iter.unsafeKey()
if opts.rateLimitFunc != nil {
if err := opts.rateLimitFunc(key, iter.lazyValue()); err != nil {
return err
}
}
switch key.Kind() {
case InternalKeyKindRangeKeyDelete, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeySet:
if opts.visitRangeKey != nil {
span := iter.unsafeSpan()
if err := opts.visitRangeKey(span.Start, span.End, span.Keys); err != nil {
return err
}
}
case InternalKeyKindRangeDelete:
if opts.visitRangeDel != nil {
rangeDel := iter.unsafeRangeDel()
if err := opts.visitRangeDel(rangeDel.Start, rangeDel.End, rangeDel.LargestSeqNum()); err != nil {
return err
}
}
default:
if opts.visitPointKey != nil {
var info IteratorLevel
if len(iter.mergingIter.heap.items) > 0 {
mergingIterIdx := iter.mergingIter.heap.items[0].index
info = iter.iterLevels[mergingIterIdx]
} else {
info = IteratorLevel{Kind: IteratorLevelUnknown}
}
val := iter.lazyValue()
if err := opts.visitPointKey(key, val, info); err != nil {
return err
}
}
}
}
return nil
}
// constructPointIter constructs a merging iterator and sets i.iter to it.
func (i *scanInternalIterator) constructPointIter(memtables flushableList, buf *iterAlloc) {
// Merging levels and levels from iterAlloc.
mlevels := buf.mlevels[:0]
levels := buf.levels[:0]
// We compute the number of levels needed ahead of time and reallocate a slice if
// the array from the iterAlloc isn't large enough. Doing this allocation once
// should improve the performance.
numMergingLevels := len(memtables)
numLevelIters := 0
current := i.version
if current == nil {
current = i.readState.current
}
numMergingLevels += len(current.L0SublevelFiles)
numLevelIters += len(current.L0SublevelFiles)
for level := 1; level < len(current.Levels); level++ {
if current.Levels[level].Empty() {
continue
}
if i.opts.skipSharedLevels && level >= sharedLevelsStart {
continue
}
numMergingLevels++
numLevelIters++
}
if numMergingLevels > cap(mlevels) {
mlevels = make([]mergingIterLevel, 0, numMergingLevels)
}
if numLevelIters > cap(levels) {
levels = make([]levelIter, 0, numLevelIters)
}
// TODO(bilal): Push these into the iterAlloc buf.
var rangeDelMiter keyspan.MergingIter
rangeDelIters := make([]keyspan.FragmentIterator, 0, numMergingLevels)
rangeDelLevels := make([]keyspan.LevelIter, 0, numLevelIters)
i.iterLevels = make([]IteratorLevel, numMergingLevels)
mlevelsIndex := 0
// Next are the memtables.
for j := len(memtables) - 1; j >= 0; j-- {
mem := memtables[j]
mlevels = append(mlevels, mergingIterLevel{
iter: mem.newIter(&i.opts.IterOptions),
})
i.iterLevels[mlevelsIndex] = IteratorLevel{
Kind: IteratorLevelFlushable,
FlushableIndex: j,
}
mlevelsIndex++
if rdi := mem.newRangeDelIter(&i.opts.IterOptions); rdi != nil {
rangeDelIters = append(rangeDelIters, rdi)
}
}
// Next are the file levels: L0 sub-levels followed by lower levels.
levelsIndex := len(levels)
mlevels = mlevels[:numMergingLevels]
levels = levels[:numLevelIters]
rangeDelLevels = rangeDelLevels[:numLevelIters]
i.opts.IterOptions.snapshotForHideObsoletePoints = i.seqNum
addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Level) {
li := &levels[levelsIndex]
rli := &rangeDelLevels[levelsIndex]
li.init(
context.Background(), i.opts.IterOptions, i.comparer, i.newIters, files, level,
internalIterOpts{})
li.initBoundaryContext(&mlevels[mlevelsIndex].levelIterBoundaryContext)
mlevels[mlevelsIndex].iter = li
rli.Init(keyspan.SpanIterOptions{RangeKeyFilters: i.opts.RangeKeyFilters},
i.comparer.Compare, tableNewRangeDelIter(context.Background(), i.newIters), files, level,
manifest.KeyTypePoint)
rangeDelIters = append(rangeDelIters, rli)
levelsIndex++
mlevelsIndex++
}
for j := len(current.L0SublevelFiles) - 1; j >= 0; j-- {
i.iterLevels[mlevelsIndex] = IteratorLevel{
Kind: IteratorLevelLSM,
Level: 0,
Sublevel: j,
}
addLevelIterForFiles(current.L0SublevelFiles[j].Iter(), manifest.L0Sublevel(j))
}
// Add level iterators for the non-empty non-L0 levels.
for level := 1; level < numLevels; level++ {
if current.Levels[level].Empty() {
continue
}
if i.opts.skipSharedLevels && level >= sharedLevelsStart {
continue
}
i.iterLevels[mlevelsIndex] = IteratorLevel{Kind: IteratorLevelLSM, Level: level}
addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level))
}
buf.merging.init(&i.opts.IterOptions, &InternalIteratorStats{}, i.comparer.Compare, i.comparer.Split, mlevels...)
buf.merging.snapshot = i.seqNum
rangeDelMiter.Init(i.comparer.Compare, keyspan.VisibleTransform(i.seqNum), new(keyspan.MergingBuffers), rangeDelIters...)
if i.opts.includeObsoleteKeys {
iiter := &keyspan.InterleavingIter{}
iiter.Init(i.comparer, &buf.merging, &rangeDelMiter,
keyspan.InterleavingIterOpts{
LowerBound: i.opts.LowerBound,
UpperBound: i.opts.UpperBound,
})
i.pointKeyIter = iiter
} else {
pcIter := &pointCollapsingIterator{
comparer: i.comparer,
merge: i.merge,
seqNum: i.seqNum,
}
pcIter.iter.Init(i.comparer, &buf.merging, &rangeDelMiter, keyspan.InterleavingIterOpts{
LowerBound: i.opts.LowerBound,
UpperBound: i.opts.UpperBound,
})
i.pointKeyIter = pcIter
}
i.iter = i.pointKeyIter
}
// constructRangeKeyIter constructs the range-key iterator stack, populating
// i.rangeKey.rangeKeyIter with the resulting iterator. This is similar to
// Iterator.constructRangeKeyIter, except it doesn't handle batches and ensures
// iterConfig does *not* elide unsets/deletes.
func (i *scanInternalIterator) constructRangeKeyIter() {
// We want the bounded iter from iterConfig, but not the collapsing of
// RangeKeyUnsets and RangeKeyDels.
i.rangeKey.rangeKeyIter = i.rangeKey.iterConfig.Init(
i.comparer, i.seqNum, i.opts.LowerBound, i.opts.UpperBound,
nil /* hasPrefix */, nil /* prefix */, false, /* onlySets */
&i.rangeKey.rangeKeyBuffers.internal)
// Next are the flushables: memtables and large batches.
if i.readState != nil {
for j := len(i.readState.memtables) - 1; j >= 0; j-- {
mem := i.readState.memtables[j]
// We only need to read from memtables which contain sequence numbers older
// than seqNum.
if logSeqNum := mem.logSeqNum; logSeqNum >= i.seqNum {
continue
}
if rki := mem.newRangeKeyIter(&i.opts.IterOptions); rki != nil {
i.rangeKey.iterConfig.AddLevel(rki)
}
}
}
current := i.version
if current == nil {
current = i.readState.current
}
// Next are the file levels: L0 sub-levels followed by lower levels.
//
// Add file-specific iterators for L0 files containing range keys. This is less
// efficient than using levelIters for sublevels of L0 files containing
// range keys, but range keys are expected to be sparse anyway, reducing the
// cost benefit of maintaining a separate L0Sublevels instance for range key
// files and then using it here.
//
// NB: We iterate L0's files in reverse order. They're sorted by
// LargestSeqNum ascending, and we need to add them to the merging iterator
// in LargestSeqNum descending to preserve the merging iterator's invariants
// around Key Trailer order.
iter := current.RangeKeyLevels[0].Iter()
for f := iter.Last(); f != nil; f = iter.Prev() {
spanIter, err := i.newIterRangeKey(f, i.opts.SpanIterOptions())
if err != nil {
i.rangeKey.iterConfig.AddLevel(&errorKeyspanIter{err: err})
continue
}
i.rangeKey.iterConfig.AddLevel(spanIter)
}
// Add level iterators for the non-empty non-L0 levels.
for level := 1; level < len(current.RangeKeyLevels); level++ {
if current.RangeKeyLevels[level].Empty() {
continue
}
if i.opts.skipSharedLevels && level >= sharedLevelsStart {
continue
}
li := i.rangeKey.iterConfig.NewLevelIter()
spanIterOpts := i.opts.SpanIterOptions()
li.Init(spanIterOpts, i.comparer.Compare, i.newIterRangeKey, current.RangeKeyLevels[level].Iter(),
manifest.Level(level), manifest.KeyTypeRange)
i.rangeKey.iterConfig.AddLevel(li)
}
}
// seekGE seeks this iterator to the first key that's greater than or equal
// to the specified user key.
func (i *scanInternalIterator) seekGE(key []byte) bool {
i.iterKey, i.iterValue = i.iter.SeekGE(key, base.SeekGEFlagsNone)
return i.iterKey != nil
}
// unsafeKey returns the unsafe InternalKey at the current position. The value
// is nil if the iterator is invalid or exhausted.
func (i *scanInternalIterator) unsafeKey() *InternalKey {
return i.iterKey
}
// lazyValue returns a value pointer to the value at the current iterator
// position. Behaviour undefined if unsafeKey() returns a Range key or Rangedel
// kind key.
func (i *scanInternalIterator) lazyValue() LazyValue {
return i.iterValue
}
// unsafeRangeDel returns a range key span. Behaviour undefined if UnsafeKey returns
// a non-rangedel kind.
func (i *scanInternalIterator) unsafeRangeDel() *keyspan.Span {
type spanInternalIterator interface {
Span() *keyspan.Span
}
return i.pointKeyIter.(spanInternalIterator).Span()
}
// unsafeSpan returns a range key span. Behaviour undefined if UnsafeKey returns
// a non-rangekey type.
func (i *scanInternalIterator) unsafeSpan() *keyspan.Span {
return i.rangeKey.iiter.Span()
}
// next advances the iterator in the forward direction, and returns the
// iterator's new validity state.
func (i *scanInternalIterator) next() bool {
i.iterKey, i.iterValue = i.iter.Next()
return i.iterKey != nil
}
// error returns an error from the internal iterator, if there's any.
func (i *scanInternalIterator) error() error {
return i.iter.Error()
}
// close closes this iterator, and releases any pooled objects.
func (i *scanInternalIterator) close() error {
if err := i.iter.Close(); err != nil {
return err
}
if i.readState != nil {
i.readState.unref()
}
if i.version != nil {
i.version.Unref()
}
if i.rangeKey != nil {
i.rangeKey.PrepareForReuse()
*i.rangeKey = iteratorRangeKeyState{
rangeKeyBuffers: i.rangeKey.rangeKeyBuffers,
}
iterRangeKeyStateAllocPool.Put(i.rangeKey)
i.rangeKey = nil
}
if alloc := i.alloc; alloc != nil {
for j := range i.boundsBuf {
if cap(i.boundsBuf[j]) >= maxKeyBufCacheSize {
alloc.boundsBuf[j] = nil
} else {
alloc.boundsBuf[j] = i.boundsBuf[j]
}
}
*alloc = iterAlloc{
keyBuf: alloc.keyBuf[:0],
boundsBuf: alloc.boundsBuf,
prefixOrFullSeekKey: alloc.prefixOrFullSeekKey[:0],
}
iterAllocPool.Put(alloc)
i.alloc = nil
}
return nil
}
func (i *scanInternalIterator) initializeBoundBufs(lower, upper []byte) {
buf := i.boundsBuf[i.boundsBufIdx][:0]
if lower != nil {
buf = append(buf, lower...)
i.opts.LowerBound = buf
} else {
i.opts.LowerBound = nil
}
if upper != nil {
buf = append(buf, upper...)
i.opts.UpperBound = buf[len(buf)-len(upper):]
} else {
i.opts.UpperBound = nil
}
i.boundsBuf[i.boundsBufIdx] = buf
i.boundsBufIdx = 1 - i.boundsBufIdx
}