diff --git a/objstorage/objstorageprovider/shared.go b/objstorage/objstorageprovider/shared.go index cdb433eabd..c017e6d539 100644 --- a/objstorage/objstorageprovider/shared.go +++ b/objstorage/objstorageprovider/shared.go @@ -240,7 +240,7 @@ func (p *provider) sharedOpenForReading( } return nil, err } - return newSharedReadable(reader, size), nil + return p.newSharedReadable(reader, size, meta.DiskFileNum), nil } func (p *provider) sharedSize(meta objstorage.ObjectMetadata) (int64, error) { diff --git a/objstorage/objstorageprovider/shared_readable.go b/objstorage/objstorageprovider/shared_readable.go index bd68fb543a..5beac7a6e1 100644 --- a/objstorage/objstorageprovider/shared_readable.go +++ b/objstorage/objstorageprovider/shared_readable.go @@ -7,7 +7,9 @@ package objstorageprovider import ( "context" + "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/objstorage" + "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache" "github.com/cockroachdb/pebble/objstorage/shared" ) @@ -16,18 +18,40 @@ import ( type sharedReadable struct { objReader shared.ObjectReader size int64 + fileNum base.DiskFileNum + provider *provider } var _ objstorage.Readable = (*sharedReadable)(nil) -func newSharedReadable(objReader shared.ObjectReader, size int64) *sharedReadable { +func (p *provider) newSharedReadable( + objReader shared.ObjectReader, size int64, fileNum base.DiskFileNum, +) *sharedReadable { return &sharedReadable{ objReader: objReader, size: size, + fileNum: fileNum, + provider: p, } } +// ReadAt is part of the objstorage.Readable interface. func (r *sharedReadable) ReadAt(ctx context.Context, p []byte, offset int64) error { + return r.readInternal(ctx, p, offset, false /* forCompaction */) +} + +// readInternal performs a read for the object, using the cache when +// appropriate. +func (r *sharedReadable) readInternal( + ctx context.Context, p []byte, offset int64, forCompaction bool, +) error { + if cache := r.provider.shared.cache; cache != nil { + flags := sharedcache.ReadFlags{ + // Don't add data to the cache if this read is for a compaction. + ReadOnly: forCompaction, + } + return r.provider.shared.cache.ReadAt(ctx, r.fileNum, p, offset, r.objReader, flags) + } return r.objReader.ReadAt(ctx, p, offset) } @@ -59,6 +83,7 @@ type sharedReadHandle struct { var _ objstorage.ReadHandle = (*sharedReadHandle)(nil) +// ReadAt is part of the objstorage.ReadHandle interface. func (r *sharedReadHandle) ReadAt(ctx context.Context, p []byte, offset int64) error { readaheadSize := r.maybeReadahead(offset, len(p)) @@ -85,7 +110,8 @@ func (r *sharedReadHandle) ReadAt(ctx context.Context, p []byte, offset int64) e } else { r.readahead.data = make([]byte, readaheadSize) } - if err := r.readable.ReadAt(ctx, r.readahead.data, offset); err != nil { + + if err := r.readable.readInternal(ctx, r.readahead.data, offset, r.forCompaction); err != nil { // Make sure we don't treat the data as valid next time. r.readahead.data = r.readahead.data[:0] return err @@ -94,7 +120,7 @@ func (r *sharedReadHandle) ReadAt(ctx context.Context, p []byte, offset int64) e return nil } - return r.readable.ReadAt(ctx, p, offset) + return r.readable.readInternal(ctx, p, offset, r.forCompaction) } func (r *sharedReadHandle) maybeReadahead(offset int64, len int) int { @@ -105,16 +131,19 @@ func (r *sharedReadHandle) maybeReadahead(offset int64, len int) int { return int(r.readahead.state.maybeReadahead(offset, int64(len))) } +// Close is part of the objstorage.ReadHandle interface. func (r *sharedReadHandle) Close() error { r.readable = nil r.readahead.data = nil return nil } +// SetupForCompaction is part of the objstorage.ReadHandle interface. func (r *sharedReadHandle) SetupForCompaction() { r.forCompaction = true } +// RecordCacheHit is part of the objstorage.ReadHandle interface. func (r *sharedReadHandle) RecordCacheHit(_ context.Context, offset, size int64) { if !r.forCompaction { r.readahead.state.recordCacheHit(offset, size) diff --git a/objstorage/objstorageprovider/sharedcache/shared_cache.go b/objstorage/objstorageprovider/sharedcache/shared_cache.go index 06771088fd..b42f491901 100644 --- a/objstorage/objstorageprovider/sharedcache/shared_cache.go +++ b/objstorage/objstorageprovider/sharedcache/shared_cache.go @@ -14,7 +14,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/invariants" - "github.com/cockroachdb/pebble/objstorage" + "github.com/cockroachdb/pebble/objstorage/shared" "github.com/cockroachdb/pebble/vfs" ) @@ -71,11 +71,25 @@ func (c *Cache) Close() error { return retErr } +// ReadFlags contains options for Cache.ReadAt. +type ReadFlags struct { + // ReadOnly instructs ReadAt to not write any new data into the cache; it is + // used when the data is unlikely to be used again. + ReadOnly bool +} + // ReadAt performs a read form an object, attempting to use cached data when // possible. func (c *Cache) ReadAt( - ctx context.Context, fileNum base.FileNum, p []byte, ofs int64, readable objstorage.Readable, + ctx context.Context, + fileNum base.DiskFileNum, + p []byte, + ofs int64, + objReader shared.ObjectReader, + flags ReadFlags, ) error { + // TODO(radu): for compaction reads, we may not want to read from the cache at + // all. { n, err := c.get(fileNum, p, ofs) if err != nil { @@ -98,6 +112,12 @@ func (c *Cache) ReadAt( } } + c.Misses.Add(1) + + if flags.ReadOnly { + return objReader.ReadAt(ctx, p, ofs) + } + // We must do reads with offset & size that are multiples of the block size. Else // later cache hits may return incorrect zeroed results from the cache. firstBlockInd := ofs / int64(c.blockSize) @@ -105,19 +125,14 @@ func (c *Cache) ReadAt( // Take the length of what is left to read plus the length of the adjustment of // the offset plus the size of a block minus one and divide by the size of a block - // to get the number of blocks to read from the readable. + // to get the number of blocks to read from the object. sizeOfOffAdjustment := int(ofs - adjustedOfs) numBlocksToRead := ((len(p) + sizeOfOffAdjustment) + (c.blockSize - 1)) / c.blockSize adjustedLen := numBlocksToRead * c.blockSize adjustedP := make([]byte, adjustedLen) // Read the rest from the object. - c.Misses.Add(1) - // TODO(josh): To have proper EOF handling, we will need readable.ReadAt to return - // the number of bytes read successfully. As is, we cannot tell if the readable.ReadAt - // should be returned from cache.ReadAt. For now, the cache just swallows all - // io.EOF errors. - if err := readable.ReadAt(ctx, adjustedP, adjustedOfs); err != nil && err != io.EOF { + if err := objReader.ReadAt(ctx, adjustedP, adjustedOfs); err != nil && err != io.EOF { return err } copy(p, adjustedP[sizeOfOffAdjustment:]) @@ -132,13 +147,14 @@ func (c *Cache) ReadAt( return nil } -// get attempts to read the requested data from the cache. +// get attempts to read the requested data from the cache, if it is already +// there. // // If all data is available, returns n = len(p). // // If data is partially available, a prefix of the data is read; returns n < len(p) // and no error. If no prefix is available, returns n = 0 and no error. -func (c *Cache) get(fileNum base.FileNum, p []byte, ofs int64) (n int, _ error) { +func (c *Cache) get(fileNum base.DiskFileNum, p []byte, ofs int64) (n int, _ error) { // The data extent might cross shard boundaries, hence the loop. In the hot // path, max two iterations of this loop will be executed, since reads are sized // in units of sstable block size. @@ -169,7 +185,7 @@ func (c *Cache) get(fileNum base.FileNum, p []byte, ofs int64) (n int, _ error) // be multiples of the block size. // // If all of p is not written to the shard, set returns a non-nil error. -func (c *Cache) set(fileNum base.FileNum, p []byte, ofs int64) error { +func (c *Cache) set(fileNum base.DiskFileNum, p []byte, ofs int64) error { if invariants.Enabled { if ofs%int64(c.blockSize) != 0 || len(p)%c.blockSize != 0 { panic(fmt.Sprintf("set with ofs & len not multiples of block size: %v %v", ofs, len(p))) @@ -200,9 +216,9 @@ func (c *Cache) set(fileNum base.FileNum, p []byte, ofs int64) error { } } -func (c *Cache) getShard(fileNum base.FileNum, ofs int64) *shard { +func (c *Cache) getShard(fileNum base.DiskFileNum, ofs int64) *shard { const prime64 = 1099511628211 - hash := uint64(fileNum)*prime64 + uint64(ofs)/ShardingBlockSize + hash := uint64(fileNum.FileNum())*prime64 + uint64(ofs)/ShardingBlockSize // TODO(josh): Instance change ops are often run in production. Such an operation // updates len(c.shards); see openSharedCache. As a result, the behavior of this // function changes, and the cache empties out at restart time. We may want a better @@ -224,7 +240,7 @@ type shard struct { } type metadataKey struct { - filenum base.FileNum + fileNum base.DiskFileNum blockIndex int64 } @@ -274,7 +290,7 @@ func (s *shard) close() error { // // If data is partially available, a prefix of the data is read; returns n < len(p) // and no error. If no prefix is available, returns n = 0 and no error. -func (s *shard) get(fileNum base.FileNum, p []byte, ofs int64) (n int, _ error) { +func (s *shard) get(fileNum base.DiskFileNum, p []byte, ofs int64) (n int, _ error) { if invariants.Enabled { if ofs/ShardingBlockSize != (ofs+int64(len(p))-1)/ShardingBlockSize { panic(fmt.Sprintf("get crosses shard boundary: %v %v", ofs, len(p))) @@ -291,7 +307,7 @@ func (s *shard) get(fileNum base.FileNum, p []byte, ofs int64) (n int, _ error) // in units of sstable block size. for { cacheBlockInd, ok := s.mu.where[metadataKey{ - filenum: fileNum, + fileNum: fileNum, blockIndex: (ofs + int64(n)) / int64(s.blockSize), }] if !ok { @@ -327,7 +343,7 @@ func (s *shard) get(fileNum base.FileNum, p []byte, ofs int64) (n int, _ error) // block size. // // If all of p is not written to the shard, set returns a non-nil error. -func (s *shard) set(fileNum base.FileNum, p []byte, ofs int64) error { +func (s *shard) set(fileNum base.DiskFileNum, p []byte, ofs int64) error { if invariants.Enabled { if ofs/ShardingBlockSize != (ofs+int64(len(p))-1)/ShardingBlockSize { panic(fmt.Sprintf("set crosses shard boundary: %v %v", ofs, len(p))) @@ -364,7 +380,7 @@ func (s *shard) set(fileNum base.FileNum, p []byte, ofs int64) error { } s.mu.where[metadataKey{ - filenum: fileNum, + fileNum: fileNum, blockIndex: (ofs + int64(n)) / int64(s.blockSize), }] = cacheBlockInd diff --git a/objstorage/objstorageprovider/sharedcache/shared_cache_test.go b/objstorage/objstorageprovider/sharedcache/shared_cache_test.go index 21ba67c4f1..a6da821293 100644 --- a/objstorage/objstorageprovider/sharedcache/shared_cache_test.go +++ b/objstorage/objstorageprovider/sharedcache/shared_cache_test.go @@ -1,6 +1,7 @@ package sharedcache_test import ( + "bytes" "context" "fmt" "testing" @@ -35,7 +36,7 @@ func TestSharedCache(t *testing.T) { provider, err := objstorageprovider.Open(objstorageprovider.DefaultSettings(fs, "")) require.NoError(t, err) - var toWrite []byte + var objData []byte datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { scanArgs := func(desc string, args ...interface{}) { t.Helper() @@ -61,10 +62,10 @@ func TestSharedCache(t *testing.T) { defer writable.Finish() // With invariants on, Write will modify its input buffer. - toWrite = make([]byte, size) + objData = make([]byte, size) wrote := make([]byte, size) for i := 0; i < size; i++ { - toWrite[i] = byte(i) + objData[i] = byte(i) wrote[i] = byte(i) } err = writable.Write(wrote) @@ -73,9 +74,11 @@ func TestSharedCache(t *testing.T) { require.NoError(t, err) return "" - case "read": + case "read", "read-for-compaction": + missesBefore := cache.Misses.Load() var size int var offset int64 + // TODO(radu): swap these arguments (the opposite order is typical). scanArgs(" ", &size, &offset) readable, err := provider.OpenForReading(ctx, base.FileTypeTable, base.FileNum(1).DiskFileNum(), objstorage.OpenOptions{}) @@ -83,16 +86,20 @@ func TestSharedCache(t *testing.T) { defer readable.Close() got := make([]byte, size) - err = cache.ReadAt(ctx, 1, got, offset, readable) + flags := sharedcache.ReadFlags{ + ReadOnly: d.Cmd == "read-for-compaction", + } + err = cache.ReadAt(ctx, base.FileNum(1).DiskFileNum(), got, offset, readable, flags) // We always expect cache.ReadAt to succeed. require.NoError(t, err) // It is easier to assert this condition programmatically, rather than returning // got, which may be very large. - require.Equal(t, toWrite[int(offset):], got) + require.True(t, bytes.Equal(objData[int(offset):int(offset)+size], got), "incorrect data returned") // TODO(josh): Not tracing out filesystem activity here, since logging_fs.go // doesn't trace calls to ReadAt or WriteAt. We should consider changing this. - return fmt.Sprintf("misses=%d", cache.Misses.Load()) + missesAfter := cache.Misses.Load() + return fmt.Sprintf("misses=%d", missesAfter-missesBefore) default: d.Fatalf(t, "unknown command %s", d.Cmd) return "" @@ -131,10 +138,10 @@ func TestSharedCacheRandomized(t *testing.T) { // With invariants on, Write will modify its input buffer. size := rand.Int63n(cacheSize) - toWrite := make([]byte, size) + objData := make([]byte, size) wrote := make([]byte, size) for i := 0; i < int(size); i++ { - toWrite[i] = byte(i) + objData[i] = byte(i) wrote[i] = byte(i) } @@ -150,14 +157,14 @@ func TestSharedCacheRandomized(t *testing.T) { offset := rand.Int63n(size) got := make([]byte, size-offset) - err = cache.ReadAt(ctx, 1, got, offset, readable) + err = cache.ReadAt(ctx, base.FileNum(1).DiskFileNum(), got, offset, readable, sharedcache.ReadFlags{}) require.NoError(t, err) - require.Equal(t, toWrite[int(offset):], got) + require.Equal(t, objData[int(offset):], got) got = make([]byte, size-offset) - err = cache.ReadAt(ctx, 1, got, offset, readable) + err = cache.ReadAt(ctx, base.FileNum(1).DiskFileNum(), got, offset, readable, sharedcache.ReadFlags{}) require.NoError(t, err) - require.Equal(t, toWrite[int(offset):], got) + require.Equal(t, objData[int(offset):], got) } } } diff --git a/objstorage/objstorageprovider/sharedcache/testdata/cache/compaction_reads b/objstorage/objstorageprovider/sharedcache/testdata/cache/compaction_reads new file mode 100644 index 0000000000..35a366467a --- /dev/null +++ b/objstorage/objstorageprovider/sharedcache/testdata/cache/compaction_reads @@ -0,0 +1,22 @@ +write 200000 +---- + +read 10000 1024 +---- +misses=1 + +# This should be in the cache. +read-for-compaction 2000 4096 +---- +misses=0 + +# This should miss the cache. +read-for-compaction 100000 4096 +---- +misses=1 + +# This should miss the cache again - we don't populate the cache when doing +# compaction reads. +read-for-compaction 100000 4096 +---- +misses=1 diff --git a/objstorage/objstorageprovider/sharedcache/testdata/cache/read_larger_than_two_cache_shards b/objstorage/objstorageprovider/sharedcache/testdata/cache/read_larger_than_two_cache_shards index 8c017f3336..d23b814f57 100644 --- a/objstorage/objstorageprovider/sharedcache/testdata/cache/read_larger_than_two_cache_shards +++ b/objstorage/objstorageprovider/sharedcache/testdata/cache/read_larger_than_two_cache_shards @@ -9,4 +9,4 @@ misses=1 read 3145671 57 ---- -misses=1 +misses=0 diff --git a/objstorage/objstorageprovider/sharedcache/testdata/cache/read_that_hits_two_cache_blocks b/objstorage/objstorageprovider/sharedcache/testdata/cache/read_that_hits_two_cache_blocks index eb5c1dedb6..68a0a9918c 100644 --- a/objstorage/objstorageprovider/sharedcache/testdata/cache/read_that_hits_two_cache_blocks +++ b/objstorage/objstorageprovider/sharedcache/testdata/cache/read_that_hits_two_cache_blocks @@ -9,8 +9,8 @@ misses=1 read 32773 0 ---- -misses=1 +misses=0 read 32716 57 ---- -misses=1 +misses=0 diff --git a/objstorage/objstorageprovider/sharedcache/testdata/cache/read_that_hits_two_cache_blocks_with_first_read_at_big_offset b/objstorage/objstorageprovider/sharedcache/testdata/cache/read_that_hits_two_cache_blocks_with_first_read_at_big_offset index 07bee28f06..d3ca370482 100644 --- a/objstorage/objstorageprovider/sharedcache/testdata/cache/read_that_hits_two_cache_blocks_with_first_read_at_big_offset +++ b/objstorage/objstorageprovider/sharedcache/testdata/cache/read_that_hits_two_cache_blocks_with_first_read_at_big_offset @@ -9,4 +9,4 @@ misses=1 read 5 32768 ---- -misses=1 +misses=0 diff --git a/objstorage/objstorageprovider/sharedcache/testdata/cache/read_that_hits_two_cache_blocks_with_first_read_at_offset b/objstorage/objstorageprovider/sharedcache/testdata/cache/read_that_hits_two_cache_blocks_with_first_read_at_offset index 2df3ec7729..ef9c58a278 100644 --- a/objstorage/objstorageprovider/sharedcache/testdata/cache/read_that_hits_two_cache_blocks_with_first_read_at_offset +++ b/objstorage/objstorageprovider/sharedcache/testdata/cache/read_that_hits_two_cache_blocks_with_first_read_at_offset @@ -9,4 +9,4 @@ misses=1 read 32716 57 ---- -misses=1 +misses=0 diff --git a/objstorage/objstorageprovider/sharedcache/testdata/cache/read_that_hits_two_cache_shards b/objstorage/objstorageprovider/sharedcache/testdata/cache/read_that_hits_two_cache_shards index 6fce8fc5d9..4374f9007d 100644 --- a/objstorage/objstorageprovider/sharedcache/testdata/cache/read_that_hits_two_cache_shards +++ b/objstorage/objstorageprovider/sharedcache/testdata/cache/read_that_hits_two_cache_shards @@ -9,8 +9,8 @@ misses=1 read 1048776 0 ---- -misses=1 +misses=0 read 1048719 57 ---- -misses=1 +misses=0 diff --git a/objstorage/objstorageprovider/sharedcache/testdata/cache/read_that_hits_two_cache_shards_with_first_read_at_offset b/objstorage/objstorageprovider/sharedcache/testdata/cache/read_that_hits_two_cache_shards_with_first_read_at_offset index b846cb8ad4..c3ada5f64f 100644 --- a/objstorage/objstorageprovider/sharedcache/testdata/cache/read_that_hits_two_cache_shards_with_first_read_at_offset +++ b/objstorage/objstorageprovider/sharedcache/testdata/cache/read_that_hits_two_cache_shards_with_first_read_at_offset @@ -9,4 +9,4 @@ misses=1 read 1048719 57 ---- -misses=1 +misses=0 diff --git a/objstorage/objstorageprovider/sharedcache/testdata/cache/small_read b/objstorage/objstorageprovider/sharedcache/testdata/cache/small_read index e0b4a5033b..086a0667f1 100644 --- a/objstorage/objstorageprovider/sharedcache/testdata/cache/small_read +++ b/objstorage/objstorageprovider/sharedcache/testdata/cache/small_read @@ -9,8 +9,8 @@ misses=1 read 10 0 ---- -misses=1 +misses=0 read 6 4 ---- -misses=1 +misses=0 diff --git a/objstorage/objstorageprovider/sharedcache/testdata/cache/small_read_with_first_read_at_offset b/objstorage/objstorageprovider/sharedcache/testdata/cache/small_read_with_first_read_at_offset index 2a883efbe5..4d6669cb5e 100644 --- a/objstorage/objstorageprovider/sharedcache/testdata/cache/small_read_with_first_read_at_offset +++ b/objstorage/objstorageprovider/sharedcache/testdata/cache/small_read_with_first_read_at_offset @@ -9,8 +9,8 @@ misses=1 read 6 4 ---- -misses=1 +misses=0 read 10 0 ---- -misses=1 +misses=0