Skip to content

Commit

Permalink
objstorageprovider: use cache in ReadAt
Browse files Browse the repository at this point in the history
This commit adds code to optionally use the shared cache when reading
from an object.

When the read is for a compaction, we don't want to populate the cache
(but we do want to read from it if the data is already there).
  • Loading branch information
RaduBerinde committed Jun 2, 2023
1 parent bceae16 commit f98d3df
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 48 deletions.
2 changes: 1 addition & 1 deletion objstorage/objstorageprovider/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (p *provider) sharedOpenForReading(
}
return nil, err
}
return newSharedReadable(reader, size), nil
return p.newSharedReadable(reader, size, meta.DiskFileNum), nil
}

func (p *provider) sharedSize(meta objstorage.ObjectMetadata) (int64, error) {
Expand Down
35 changes: 32 additions & 3 deletions objstorage/objstorageprovider/shared_readable.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ package objstorageprovider
import (
"context"

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
"github.com/cockroachdb/pebble/objstorage/shared"
)

Expand All @@ -16,18 +18,40 @@ import (
type sharedReadable struct {
objReader shared.ObjectReader
size int64
fileNum base.DiskFileNum
provider *provider
}

var _ objstorage.Readable = (*sharedReadable)(nil)

func newSharedReadable(objReader shared.ObjectReader, size int64) *sharedReadable {
func (p *provider) newSharedReadable(
objReader shared.ObjectReader, size int64, fileNum base.DiskFileNum,
) *sharedReadable {
return &sharedReadable{
objReader: objReader,
size: size,
fileNum: fileNum,
provider: p,
}
}

// ReadAt is part of the objstorage.Readable interface.
func (r *sharedReadable) ReadAt(ctx context.Context, p []byte, offset int64) error {
return r.readInternal(ctx, p, offset, false /* forCompaction */)
}

// readInternal performs a read for the object, using the cache when
// appropriate.
func (r *sharedReadable) readInternal(
ctx context.Context, p []byte, offset int64, forCompaction bool,
) error {
if cache := r.provider.shared.cache; cache != nil {
flags := sharedcache.ReadFlags{
// Don't add data to the cache if this read is for a compaction.
ReadOnly: forCompaction,
}
return r.provider.shared.cache.ReadAt(ctx, r.fileNum, p, offset, r.objReader, flags)
}
return r.objReader.ReadAt(ctx, p, offset)
}

Expand Down Expand Up @@ -59,6 +83,7 @@ type sharedReadHandle struct {

var _ objstorage.ReadHandle = (*sharedReadHandle)(nil)

// ReadAt is part of the objstorage.ReadHandle interface.
func (r *sharedReadHandle) ReadAt(ctx context.Context, p []byte, offset int64) error {
readaheadSize := r.maybeReadahead(offset, len(p))

Expand All @@ -85,7 +110,8 @@ func (r *sharedReadHandle) ReadAt(ctx context.Context, p []byte, offset int64) e
} else {
r.readahead.data = make([]byte, readaheadSize)
}
if err := r.readable.ReadAt(ctx, r.readahead.data, offset); err != nil {

if err := r.readable.readInternal(ctx, r.readahead.data, offset, r.forCompaction); err != nil {
// Make sure we don't treat the data as valid next time.
r.readahead.data = r.readahead.data[:0]
return err
Expand All @@ -94,7 +120,7 @@ func (r *sharedReadHandle) ReadAt(ctx context.Context, p []byte, offset int64) e
return nil
}

return r.readable.ReadAt(ctx, p, offset)
return r.readable.readInternal(ctx, p, offset, r.forCompaction)
}

func (r *sharedReadHandle) maybeReadahead(offset int64, len int) int {
Expand All @@ -105,16 +131,19 @@ func (r *sharedReadHandle) maybeReadahead(offset int64, len int) int {
return int(r.readahead.state.maybeReadahead(offset, int64(len)))
}

// Close is part of the objstorage.ReadHandle interface.
func (r *sharedReadHandle) Close() error {
r.readable = nil
r.readahead.data = nil
return nil
}

// SetupForCompaction is part of the objstorage.ReadHandle interface.
func (r *sharedReadHandle) SetupForCompaction() {
r.forCompaction = true
}

// RecordCacheHit is part of the objstorage.ReadHandle interface.
func (r *sharedReadHandle) RecordCacheHit(_ context.Context, offset, size int64) {
if !r.forCompaction {
r.readahead.state.recordCacheHit(offset, size)
Expand Down
54 changes: 35 additions & 19 deletions objstorage/objstorageprovider/sharedcache/shared_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/shared"
"github.com/cockroachdb/pebble/vfs"
)

Expand Down Expand Up @@ -71,11 +71,25 @@ func (c *Cache) Close() error {
return retErr
}

// ReadFlags contains options for Cache.ReadAt.
type ReadFlags struct {
// ReadOnly instructs ReadAt to not write any new data into the cache; it is
// used when the data is unlikely to be used again.
ReadOnly bool
}

// ReadAt performs a read form an object, attempting to use cached data when
// possible.
func (c *Cache) ReadAt(
ctx context.Context, fileNum base.FileNum, p []byte, ofs int64, readable objstorage.Readable,
ctx context.Context,
fileNum base.DiskFileNum,
p []byte,
ofs int64,
objReader shared.ObjectReader,
flags ReadFlags,
) error {
// TODO(radu): for compaction reads, we may not want to read from the cache at
// all.
{
n, err := c.get(fileNum, p, ofs)
if err != nil {
Expand All @@ -98,26 +112,27 @@ func (c *Cache) ReadAt(
}
}

c.Misses.Add(1)

if flags.ReadOnly {
return objReader.ReadAt(ctx, p, ofs)
}

// We must do reads with offset & size that are multiples of the block size. Else
// later cache hits may return incorrect zeroed results from the cache.
firstBlockInd := ofs / int64(c.blockSize)
adjustedOfs := firstBlockInd * int64(c.blockSize)

// Take the length of what is left to read plus the length of the adjustment of
// the offset plus the size of a block minus one and divide by the size of a block
// to get the number of blocks to read from the readable.
// to get the number of blocks to read from the object.
sizeOfOffAdjustment := int(ofs - adjustedOfs)
numBlocksToRead := ((len(p) + sizeOfOffAdjustment) + (c.blockSize - 1)) / c.blockSize
adjustedLen := numBlocksToRead * c.blockSize
adjustedP := make([]byte, adjustedLen)

// Read the rest from the object.
c.Misses.Add(1)
// TODO(josh): To have proper EOF handling, we will need readable.ReadAt to return
// the number of bytes read successfully. As is, we cannot tell if the readable.ReadAt
// should be returned from cache.ReadAt. For now, the cache just swallows all
// io.EOF errors.
if err := readable.ReadAt(ctx, adjustedP, adjustedOfs); err != nil && err != io.EOF {
if err := objReader.ReadAt(ctx, adjustedP, adjustedOfs); err != nil && err != io.EOF {
return err
}
copy(p, adjustedP[sizeOfOffAdjustment:])
Expand All @@ -132,13 +147,14 @@ func (c *Cache) ReadAt(
return nil
}

// get attempts to read the requested data from the cache.
// get attempts to read the requested data from the cache, if it is already
// there.
//
// If all data is available, returns n = len(p).
//
// If data is partially available, a prefix of the data is read; returns n < len(p)
// and no error. If no prefix is available, returns n = 0 and no error.
func (c *Cache) get(fileNum base.FileNum, p []byte, ofs int64) (n int, _ error) {
func (c *Cache) get(fileNum base.DiskFileNum, p []byte, ofs int64) (n int, _ error) {
// The data extent might cross shard boundaries, hence the loop. In the hot
// path, max two iterations of this loop will be executed, since reads are sized
// in units of sstable block size.
Expand Down Expand Up @@ -169,7 +185,7 @@ func (c *Cache) get(fileNum base.FileNum, p []byte, ofs int64) (n int, _ error)
// be multiples of the block size.
//
// If all of p is not written to the shard, set returns a non-nil error.
func (c *Cache) set(fileNum base.FileNum, p []byte, ofs int64) error {
func (c *Cache) set(fileNum base.DiskFileNum, p []byte, ofs int64) error {
if invariants.Enabled {
if ofs%int64(c.blockSize) != 0 || len(p)%c.blockSize != 0 {
panic(fmt.Sprintf("set with ofs & len not multiples of block size: %v %v", ofs, len(p)))
Expand Down Expand Up @@ -200,9 +216,9 @@ func (c *Cache) set(fileNum base.FileNum, p []byte, ofs int64) error {
}
}

func (c *Cache) getShard(fileNum base.FileNum, ofs int64) *shard {
func (c *Cache) getShard(fileNum base.DiskFileNum, ofs int64) *shard {
const prime64 = 1099511628211
hash := uint64(fileNum)*prime64 + uint64(ofs)/ShardingBlockSize
hash := uint64(fileNum.FileNum())*prime64 + uint64(ofs)/ShardingBlockSize
// TODO(josh): Instance change ops are often run in production. Such an operation
// updates len(c.shards); see openSharedCache. As a result, the behavior of this
// function changes, and the cache empties out at restart time. We may want a better
Expand All @@ -224,7 +240,7 @@ type shard struct {
}

type metadataKey struct {
filenum base.FileNum
fileNum base.DiskFileNum
blockIndex int64
}

Expand Down Expand Up @@ -274,7 +290,7 @@ func (s *shard) close() error {
//
// If data is partially available, a prefix of the data is read; returns n < len(p)
// and no error. If no prefix is available, returns n = 0 and no error.
func (s *shard) get(fileNum base.FileNum, p []byte, ofs int64) (n int, _ error) {
func (s *shard) get(fileNum base.DiskFileNum, p []byte, ofs int64) (n int, _ error) {
if invariants.Enabled {
if ofs/ShardingBlockSize != (ofs+int64(len(p))-1)/ShardingBlockSize {
panic(fmt.Sprintf("get crosses shard boundary: %v %v", ofs, len(p)))
Expand All @@ -291,7 +307,7 @@ func (s *shard) get(fileNum base.FileNum, p []byte, ofs int64) (n int, _ error)
// in units of sstable block size.
for {
cacheBlockInd, ok := s.mu.where[metadataKey{
filenum: fileNum,
fileNum: fileNum,
blockIndex: (ofs + int64(n)) / int64(s.blockSize),
}]
if !ok {
Expand Down Expand Up @@ -327,7 +343,7 @@ func (s *shard) get(fileNum base.FileNum, p []byte, ofs int64) (n int, _ error)
// block size.
//
// If all of p is not written to the shard, set returns a non-nil error.
func (s *shard) set(fileNum base.FileNum, p []byte, ofs int64) error {
func (s *shard) set(fileNum base.DiskFileNum, p []byte, ofs int64) error {
if invariants.Enabled {
if ofs/ShardingBlockSize != (ofs+int64(len(p))-1)/ShardingBlockSize {
panic(fmt.Sprintf("set crosses shard boundary: %v %v", ofs, len(p)))
Expand Down Expand Up @@ -364,7 +380,7 @@ func (s *shard) set(fileNum base.FileNum, p []byte, ofs int64) error {
}

s.mu.where[metadataKey{
filenum: fileNum,
fileNum: fileNum,
blockIndex: (ofs + int64(n)) / int64(s.blockSize),
}] = cacheBlockInd

Expand Down
33 changes: 20 additions & 13 deletions objstorage/objstorageprovider/sharedcache/shared_cache_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sharedcache_test

import (
"bytes"
"context"
"fmt"
"testing"
Expand Down Expand Up @@ -35,7 +36,7 @@ func TestSharedCache(t *testing.T) {
provider, err := objstorageprovider.Open(objstorageprovider.DefaultSettings(fs, ""))
require.NoError(t, err)

var toWrite []byte
var objData []byte
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
scanArgs := func(desc string, args ...interface{}) {
t.Helper()
Expand All @@ -61,10 +62,10 @@ func TestSharedCache(t *testing.T) {
defer writable.Finish()

// With invariants on, Write will modify its input buffer.
toWrite = make([]byte, size)
objData = make([]byte, size)
wrote := make([]byte, size)
for i := 0; i < size; i++ {
toWrite[i] = byte(i)
objData[i] = byte(i)
wrote[i] = byte(i)
}
err = writable.Write(wrote)
Expand All @@ -73,26 +74,32 @@ func TestSharedCache(t *testing.T) {
require.NoError(t, err)

return ""
case "read":
case "read", "read-for-compaction":
missesBefore := cache.Misses.Load()
var size int
var offset int64
// TODO(radu): swap these arguments (the opposite order is typical).
scanArgs("<size> <offset>", &size, &offset)

readable, err := provider.OpenForReading(ctx, base.FileTypeTable, base.FileNum(1).DiskFileNum(), objstorage.OpenOptions{})
require.NoError(t, err)
defer readable.Close()

got := make([]byte, size)
err = cache.ReadAt(ctx, 1, got, offset, readable)
flags := sharedcache.ReadFlags{
ReadOnly: d.Cmd == "read-for-compaction",
}
err = cache.ReadAt(ctx, base.FileNum(1).DiskFileNum(), got, offset, readable, flags)
// We always expect cache.ReadAt to succeed.
require.NoError(t, err)
// It is easier to assert this condition programmatically, rather than returning
// got, which may be very large.
require.Equal(t, toWrite[int(offset):], got)
require.True(t, bytes.Equal(objData[int(offset):int(offset)+size], got), "incorrect data returned")

// TODO(josh): Not tracing out filesystem activity here, since logging_fs.go
// doesn't trace calls to ReadAt or WriteAt. We should consider changing this.
return fmt.Sprintf("misses=%d", cache.Misses.Load())
missesAfter := cache.Misses.Load()
return fmt.Sprintf("misses=%d", missesAfter-missesBefore)
default:
d.Fatalf(t, "unknown command %s", d.Cmd)
return ""
Expand Down Expand Up @@ -131,10 +138,10 @@ func TestSharedCacheRandomized(t *testing.T) {

// With invariants on, Write will modify its input buffer.
size := rand.Int63n(cacheSize)
toWrite := make([]byte, size)
objData := make([]byte, size)
wrote := make([]byte, size)
for i := 0; i < int(size); i++ {
toWrite[i] = byte(i)
objData[i] = byte(i)
wrote[i] = byte(i)
}

Expand All @@ -150,14 +157,14 @@ func TestSharedCacheRandomized(t *testing.T) {
offset := rand.Int63n(size)

got := make([]byte, size-offset)
err = cache.ReadAt(ctx, 1, got, offset, readable)
err = cache.ReadAt(ctx, base.FileNum(1).DiskFileNum(), got, offset, readable, sharedcache.ReadFlags{})
require.NoError(t, err)
require.Equal(t, toWrite[int(offset):], got)
require.Equal(t, objData[int(offset):], got)

got = make([]byte, size-offset)
err = cache.ReadAt(ctx, 1, got, offset, readable)
err = cache.ReadAt(ctx, base.FileNum(1).DiskFileNum(), got, offset, readable, sharedcache.ReadFlags{})
require.NoError(t, err)
require.Equal(t, toWrite[int(offset):], got)
require.Equal(t, objData[int(offset):], got)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
write 200000
----

read 10000 1024
----
misses=1

# This should be in the cache.
read-for-compaction 2000 4096
----
misses=0

# This should miss the cache.
read-for-compaction 100000 4096
----
misses=1

# This should miss the cache again - we don't populate the cache when doing
# compaction reads.
read-for-compaction 100000 4096
----
misses=1
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ misses=1

read 3145671 57
----
misses=1
misses=0
Loading

0 comments on commit f98d3df

Please sign in to comment.