From c8f96df15c75d8a4eea1667d3bd79e50db1044e7 Mon Sep 17 00:00:00 2001 From: "Wenlei (Frank) He" Date: Wed, 18 Dec 2024 01:11:04 +0000 Subject: [PATCH 01/14] Add sudo codes --- internal/gcsx/random_reader.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/internal/gcsx/random_reader.go b/internal/gcsx/random_reader.go index 3a58d474d8..3f11dd5b5c 100644 --- a/internal/gcsx/random_reader.go +++ b/internal/gcsx/random_reader.go @@ -97,6 +97,12 @@ func NewRandomReader(o *gcs.MinObject, bucket gcs.Bucket, sequentialReadSizeMb i } } +type reader struct { + reader io.ReadCloser + start int64 + limit int64 +} + type randomReader struct { object *gcs.MinObject bucket gcs.Bucket @@ -106,6 +112,8 @@ type randomReader struct { // INVARIANT: (reader == nil) == (cancel == nil) reader io.ReadCloser cancel func() + + readers *reader // The range of the object that we expect reader to yield, when reader is // non-nil. When reader is nil, limit is the limit of the previous read @@ -136,6 +144,10 @@ type randomReader struct { metricHandle common.MetricHandle } +func (rr *randomReader) lookupReader(offset int64) *reader, error { + return nil, fmt.Errorf("Not implemented") +} + func (rr *randomReader) CheckInvariants() { // INVARIANT: (reader == nil) == (cancel == nil) if (rr.reader == nil) != (rr.cancel == nil) { @@ -292,6 +304,8 @@ func (rr *randomReader) ReadAt( err = io.EOF return } + + reader == rr.lookupReader(offset) // When the offset is AFTER the reader position, try to seek forward, within reason. // This happens when the kernel page cache serves some data. It's very common for @@ -530,6 +544,12 @@ func (rr *randomReader) startRead( rr.cancel = cancel rr.start = start rr.limit = end + + rr.readers = append(rr.readers, &reader{ + reader: rc, + start: start, + limit: end, + }) requestedDataSize := end - start common.CaptureGCSReadMetrics(ctx, rr.metricHandle, readType, requestedDataSize) From d9fe43683ee4efcf43cb66333c9962c6b7975f9c Mon Sep 17 00:00:00 2001 From: "Wenlei (Frank) He" Date: Thu, 19 Dec 2024 00:57:21 +0000 Subject: [PATCH 02/14] bucket reader cache --- internal/gcsx/random_reader.go | 240 ++++++++++++++++++++++++--------- 1 file changed, 173 insertions(+), 67 deletions(-) diff --git a/internal/gcsx/random_reader.go b/internal/gcsx/random_reader.go index 3f11dd5b5c..10f643d5e6 100644 --- a/internal/gcsx/random_reader.go +++ b/internal/gcsx/random_reader.go @@ -46,7 +46,7 @@ const minReadSize = MB // Max read size in bytes for random reads. // If the average read size (between seeks) is below this number, reads will // optimised for random access. -// We will skip forwards in a GCS response at most this many bytes. + // About 6 MB of data is buffered anyway, so 8 MB seems like a good round number. const maxReadSize = 8 * MB @@ -97,10 +97,32 @@ func NewRandomReader(o *gcs.MinObject, bucket gcs.Bucket, sequentialReadSizeMb i } } -type reader struct { - reader io.ReadCloser - start int64 - limit int64 +// bucketReader holds a open stream to a GCS bucket for a given range. +type bucketReader struct { + reader io.ReadCloser + start int64 + end int64 + lastUsed time.Time +} + +func (br *bucketReader) isWithinRange(offset int64) bool { + return offset >= br.start && offset < br.end +} + +func (br *bucketReader) read(offset int64, p []byte) (int, error) { + bytesToSkip := int64(offset - br.start) + skip := make([]byte, bytesToSkip) + n, err := io.ReadFull(br.reader, skip) + if err != nil { + return 0, err + } + br.start += int64(n) + + // Actually read the data into the buffer. + n, err = io.ReadFull(br.reader, p) + br.start += int64(n) + br.lastUsed = time.Now() + return n, err } type randomReader struct { @@ -112,8 +134,8 @@ type randomReader struct { // INVARIANT: (reader == nil) == (cancel == nil) reader io.ReadCloser cancel func() - - readers *reader + + bucketReaders []*bucketReader // The range of the object that we expect reader to yield, when reader is // non-nil. When reader is nil, limit is the limit of the previous read @@ -144,8 +166,13 @@ type randomReader struct { metricHandle common.MetricHandle } -func (rr *randomReader) lookupReader(offset int64) *reader, error { - return nil, fmt.Errorf("Not implemented") +func (rr *randomReader) getBucketReader(offset int64) *bucketReader { + for _, br := range rr.bucketReaders { + if br.isWithinRange(offset) { + return br + } + } + return nil } func (rr *randomReader) CheckInvariants() { @@ -304,72 +331,25 @@ func (rr *randomReader) ReadAt( err = io.EOF return } - - reader == rr.lookupReader(offset) - - // When the offset is AFTER the reader position, try to seek forward, within reason. - // This happens when the kernel page cache serves some data. It's very common for - // concurrent reads, often by only a few 128kB fuse read requests. The aim is to - // re-use GCS connection and avoid throwing away already read data. - // For parallel sequential reads to a single file, not throwing away the connections - // is a 15-20x improvement in throughput: 150-200 MB/s instead of 10 MB/s. - if rr.reader != nil && rr.start < offset && offset-rr.start < maxReadSize { - bytesToSkip := int64(offset - rr.start) - p := make([]byte, bytesToSkip) - n, _ := io.ReadFull(rr.reader, p) - rr.start += int64(n) - } - - // If we have an existing reader but it's positioned at the wrong place, - // clean it up and throw it away. - if rr.reader != nil && rr.start != offset { - rr.reader.Close() - rr.reader = nil - rr.cancel = nil - rr.seeks++ - } - // If we don't have a reader, start a read operation. - if rr.reader == nil { - err = rr.startRead(ctx, offset, int64(len(p))) + br := rr.getBucketReader(offset) + if br == nil { + br, err = rr.newBucketReader(ctx, offset, offset+maxReadSize) if err != nil { - err = fmt.Errorf("startRead: %w", err) + err = fmt.Errorf("ReadAt: while creating bucket reader: %w", err) return } - } - // Now we have a reader positioned at the correct place. Consume as much from - // it as possible. + rr.bucketReaders = append(rr.bucketReaders, br) + } var tmp int - tmp, err = rr.readFull(ctx, p) + tmp, err = br.read(ctx, p) n += tmp p = p[tmp:] - rr.start += int64(tmp) offset += int64(tmp) rr.totalReadBytes += uint64(tmp) - // Sanity check. - if rr.start > rr.limit { - err = fmt.Errorf("reader returned %d too many bytes", rr.start-rr.limit) - - // Don't attempt to reuse the reader when it's behaving wackily. - rr.reader.Close() - rr.reader = nil - rr.cancel = nil - rr.start = -1 - rr.limit = -1 - - return - } - - // Are we finished with this reader now? - if rr.start == rr.limit { - rr.reader.Close() - rr.reader = nil - rr.cancel = nil - } - // Handle errors. switch { case err == io.EOF || err == io.ErrUnexpectedEOF: @@ -385,9 +365,94 @@ func (rr *randomReader) ReadAt( case err != nil: // Propagate other errors. - err = fmt.Errorf("readFull: %w", err) + err = fmt.Errorf("read error: %w", err) return } + // When the offset is AFTER the reader position, try to seek forward, within reason. + // This happens when the kernel page cache serves some data. It's very common for + // concurrent reads, often by only a few 128kB fuse read requests. The aim is to + // re-use GCS connection and avoid throwing away already read data. + // For parallel sequential reads to a single file, not throwing away the connections + // is a 15-20x improvement in throughput: 150-200 MB/s instead of 10 MB/s. + /* + if rr.reader != nil && rr.start < offset && offset-rr.start < maxReadSize { + bytesToSkip := int64(offset - rr.start) + p := make([]byte, bytesToSkip) + n, _ := io.ReadFull(rr.reader, p) + rr.start += int64(n) + } + + + // If we have an existing reader but it's positioned at the wrong place, + // clean it up and throw it away. + if rr.reader != nil && rr.start != offset { + rr.reader.Close() + rr.reader = nil + rr.cancel = nil + rr.seeks++ + } + + // If we don't have a reader, start a read operation. + if rr.reader == nil { + err = rr.startRead(ctx, offset, int64(len(p))) + if err != nil { + err = fmt.Errorf("startRead: %w", err) + return + } + } + + // Now we have a reader positioned at the correct place. Consume as much from + // it as possible. + var tmp int + tmp, err = rr.readFull(ctx, p) + + n += tmp + p = p[tmp:] + rr.start += int64(tmp) + offset += int64(tmp) + rr.totalReadBytes += uint64(tmp) + + // Sanity check. + if rr.start > rr.limit { + err = fmt.Errorf("reader returned %d too many bytes", rr.start-rr.limit) + + // Don't attempt to reuse the reader when it's behaving wackily. + rr.reader.Close() + rr.reader = nil + rr.cancel = nil + rr.start = -1 + rr.limit = -1 + + return + } + + // Are we finished with this reader now? + if rr.start == rr.limit { + rr.reader.Close() + rr.reader = nil + rr.cancel = nil + } + + // Handle errors. + switch { + case err == io.EOF || err == io.ErrUnexpectedEOF: + // For a non-empty buffer, ReadFull returns EOF or ErrUnexpectedEOF only + // if the reader peters out early. That's fine, but it means we should + // have hit the limit above. + if rr.reader != nil { + err = fmt.Errorf("reader returned %d too few bytes", rr.limit-rr.start) + return + } + + err = nil + + case err != nil: + // Propagate other errors. + err = fmt.Errorf("readFull: %w", err) + return + } + + */ } return @@ -454,6 +519,47 @@ func (rr *randomReader) readFull( return } +func (rr *randomReader) newBucketReader( + ctx context.Context, + start int64, + end int64) (*bucketReader, error) { + + // Begin the read. + ctx, _ = context.WithCancel(context.Background()) + rc, err := rr.bucket.NewReader( + ctx, + &gcs.ReadObjectRequest{ + Name: rr.object.Name, + Generation: rr.object.Generation, + Range: &gcs.ByteRange{ + Start: uint64(start), + Limit: uint64(end), + }, + ReadCompressed: rr.object.HasContentEncodingGzip(), + }) + + // If a file handle is open locally, but the corresponding object doesn't exist + // in GCS, it indicates a file clobbering scenario. This likely occurred because: + // - The file was deleted in GCS while a local handle was still open. + // - The file content was modified leading to different generation number. + var notFoundError *gcs.NotFoundError + if errors.As(err, ¬FoundError) { + return nil, &gcsfuse_errors.FileClobberedError{ + Err: fmt.Errorf("NewReader: %w", err), + } + } + + common.CaptureGCSReadMetrics(ctx, rr.metricHandle, util.Sequential, end-start) + p := make([]byte, maxReadSize) + n, _ := rc.reader.Read(p) + + return &bucketReader{ + reader: rc, + start: start, + end: start + int64(n), + }, nil +} + // Ensure that rr.reader is set up for a range for which [start, start+size) is // a prefix. Irrespective of the size requested, we try to fetch more data // from GCS defined by sequentialReadSizeMb flag to serve future read requests. @@ -544,11 +650,11 @@ func (rr *randomReader) startRead( rr.cancel = cancel rr.start = start rr.limit = end - - rr.readers = append(rr.readers, &reader{ + + rr.rangeCaches = append(rr.rangeCaches, &reader{ reader: rc, start: start, - limit: end, + end: end, }) requestedDataSize := end - start From 32747c073aae8a88aa0219180aef1eb6cdf5eb6a Mon Sep 17 00:00:00 2001 From: "Wenlei (Frank) He" Date: Thu, 19 Dec 2024 01:06:16 +0000 Subject: [PATCH 03/14] bucket reader cache --- internal/gcsx/random_reader.go | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/internal/gcsx/random_reader.go b/internal/gcsx/random_reader.go index 10f643d5e6..e27f119559 100644 --- a/internal/gcsx/random_reader.go +++ b/internal/gcsx/random_reader.go @@ -46,7 +46,7 @@ const minReadSize = MB // Max read size in bytes for random reads. // If the average read size (between seeks) is below this number, reads will // optimised for random access. - +// We will skip forwards in a GCS response at most this many bytes. // About 6 MB of data is buffered anyway, so 8 MB seems like a good round number. const maxReadSize = 8 * MB @@ -168,6 +168,9 @@ type randomReader struct { func (rr *randomReader) getBucketReader(offset int64) *bucketReader { for _, br := range rr.bucketReaders { + if br.reader == nil { + continue + } if br.isWithinRange(offset) { return br } @@ -350,6 +353,16 @@ func (rr *randomReader) ReadAt( offset += int64(tmp) rr.totalReadBytes += uint64(tmp) + if br.start == br.end { + err := br.reader.Close() + if err != nil { + logger.Tracef("Closing bucketReader: %v", err) + } + + // TODO: remove the bucket reader from the list. + br.reader = nil + } + // Handle errors. switch { case err == io.EOF || err == io.ErrUnexpectedEOF: @@ -464,6 +477,15 @@ func (rr *randomReader) Object() (o *gcs.MinObject) { } func (rr *randomReader) Destroy() { + for _, br := range rr.bucketReaders { + if br.reader != nil { + err := br.reader.Close() + br.reader = nil + if err != nil { + logger.Warnf("rr.Destroy(): while closing bucket reader: %v", err) + } + } + } // Close out the reader, if we have one. if rr.reader != nil { err := rr.reader.Close() @@ -550,13 +572,11 @@ func (rr *randomReader) newBucketReader( } common.CaptureGCSReadMetrics(ctx, rr.metricHandle, util.Sequential, end-start) - p := make([]byte, maxReadSize) - n, _ := rc.reader.Read(p) return &bucketReader{ reader: rc, start: start, - end: start + int64(n), + end: end, }, nil } @@ -651,12 +671,6 @@ func (rr *randomReader) startRead( rr.start = start rr.limit = end - rr.rangeCaches = append(rr.rangeCaches, &reader{ - reader: rc, - start: start, - end: end, - }) - requestedDataSize := end - start common.CaptureGCSReadMetrics(ctx, rr.metricHandle, readType, requestedDataSize) From 43af5ab07fa886a79cca80c7cc218434037a92b2 Mon Sep 17 00:00:00 2001 From: "Wenlei (Frank) He" Date: Thu, 19 Dec 2024 01:09:42 +0000 Subject: [PATCH 04/14] bucket reader cache --- internal/gcsx/random_reader.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/internal/gcsx/random_reader.go b/internal/gcsx/random_reader.go index e27f119559..eec42ebec9 100644 --- a/internal/gcsx/random_reader.go +++ b/internal/gcsx/random_reader.go @@ -337,6 +337,7 @@ func (rr *randomReader) ReadAt( br := rr.getBucketReader(offset) if br == nil { + // TODO: remove stale bucket readers before adding new one. br, err = rr.newBucketReader(ctx, offset, offset+maxReadSize) if err != nil { err = fmt.Errorf("ReadAt: while creating bucket reader: %w", err) @@ -359,7 +360,7 @@ func (rr *randomReader) ReadAt( logger.Tracef("Closing bucketReader: %v", err) } - // TODO: remove the bucket reader from the list. + // TODO: remove this bucket reader from the list. br.reader = nil } @@ -574,9 +575,10 @@ func (rr *randomReader) newBucketReader( common.CaptureGCSReadMetrics(ctx, rr.metricHandle, util.Sequential, end-start) return &bucketReader{ - reader: rc, - start: start, - end: end, + reader: rc, + start: start, + end: end, + lastUsed: time.Now(), }, nil } From f29d2946a8f9124bbf0147e3212d0b2c746d2e76 Mon Sep 17 00:00:00 2001 From: "Wenlei (Frank) He" Date: Thu, 19 Dec 2024 01:10:24 +0000 Subject: [PATCH 05/14] bucket reader cache --- internal/gcsx/random_reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/gcsx/random_reader.go b/internal/gcsx/random_reader.go index eec42ebec9..4b66895f31 100644 --- a/internal/gcsx/random_reader.go +++ b/internal/gcsx/random_reader.go @@ -106,7 +106,7 @@ type bucketReader struct { } func (br *bucketReader) isWithinRange(offset int64) bool { - return offset >= br.start && offset < br.end + return br.reader != nil && offset >= br.start && offset < br.end } func (br *bucketReader) read(offset int64, p []byte) (int, error) { From e4ff705d8b160194b94108907bc00c8725fb5774 Mon Sep 17 00:00:00 2001 From: "Wenlei (Frank) He" Date: Thu, 19 Dec 2024 05:34:34 +0000 Subject: [PATCH 06/14] add bucket reader --- internal/gcsx/random_reader.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/internal/gcsx/random_reader.go b/internal/gcsx/random_reader.go index 4b66895f31..77acc5efb1 100644 --- a/internal/gcsx/random_reader.go +++ b/internal/gcsx/random_reader.go @@ -338,16 +338,19 @@ func (rr *randomReader) ReadAt( br := rr.getBucketReader(offset) if br == nil { // TODO: remove stale bucket readers before adding new one. - br, err = rr.newBucketReader(ctx, offset, offset+maxReadSize) + // NOte: it doesn't really prefetch rr.sequentialReadSizeMb that many bytes. It just keeps + // a open stream that could read up to that many bytes. It only keeps about 6MB in memory. + br, err = rr.newBucketReader(ctx, offset, offset+int64(rr.sequentialReadSizeMb*MB)) if err != nil { err = fmt.Errorf("ReadAt: while creating bucket reader: %w", err) return } rr.bucketReaders = append(rr.bucketReaders, br) + logger.Tracef("Adding a bucketReader, total bucketReaders: %d", len(rr.bucketReaders)) } var tmp int - tmp, err = br.read(ctx, p) + tmp, err = br.read(offset, p) n += tmp p = p[tmp:] @@ -357,7 +360,7 @@ func (rr *randomReader) ReadAt( if br.start == br.end { err := br.reader.Close() if err != nil { - logger.Tracef("Closing bucketReader: %v", err) + logger.Tracef("Closing bucketReader error: %v", err) } // TODO: remove this bucket reader from the list. From 3e5ce22a403ea0d2b7ec02b411c26ee5002e4647 Mon Sep 17 00:00:00 2001 From: "Wenlei (Frank) He" Date: Thu, 19 Dec 2024 21:41:43 +0000 Subject: [PATCH 07/14] add bucket reader --- internal/gcsx/random_reader.go | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/internal/gcsx/random_reader.go b/internal/gcsx/random_reader.go index 77acc5efb1..25ee7123a1 100644 --- a/internal/gcsx/random_reader.go +++ b/internal/gcsx/random_reader.go @@ -99,24 +99,35 @@ func NewRandomReader(o *gcs.MinObject, bucket gcs.Bucket, sequentialReadSizeMb i // bucketReader holds a open stream to a GCS bucket for a given range. type bucketReader struct { - reader io.ReadCloser - start int64 - end int64 - lastUsed time.Time + pid uint32 + reader io.ReadCloser + start int64 + end int64 + numSeeks int64 + bytesRead int64 + lastUsed time.Time } func (br *bucketReader) isWithinRange(offset int64) bool { return br.reader != nil && offset >= br.start && offset < br.end } +func (br *bucketReader) isPidMatched(pid uint32) bool { + return pid > 0 && br.pid == pid +} + func (br *bucketReader) read(offset int64, p []byte) (int, error) { bytesToSkip := int64(offset - br.start) skip := make([]byte, bytesToSkip) + if bytesToSkip > 0 { + br.numSeeks++ + } n, err := io.ReadFull(br.reader, skip) if err != nil { return 0, err } br.start += int64(n) + br.bytesRead += int64(n) // Actually read the data into the buffer. n, err = io.ReadFull(br.reader, p) @@ -166,12 +177,14 @@ type randomReader struct { metricHandle common.MetricHandle } -func (rr *randomReader) getBucketReader(offset int64) *bucketReader { +// func (rr *randomReader) getBucketReader(offset int64) *bucketReader { +func (rr *randomReader) getBucketReader(pid uint32) *bucketReader { for _, br := range rr.bucketReaders { if br.reader == nil { continue } - if br.isWithinRange(offset) { + //if br.isWithinRange(offset) { + if br.isPidMatched(pid) { return br } } @@ -327,7 +340,8 @@ func (rr *randomReader) ReadAt( if cacheHit || n == len(p) || (n < len(p) && uint64(offset)+uint64(n) == rr.object.Size) { return } - + readOp := ctx.Value(ReadOp).(*fuseops.ReadFileOp) + pid := readOp.OpContext.Pid for len(p) > 0 { // Have we blown past the end of the object? if offset >= int64(rr.object.Size) { @@ -335,7 +349,8 @@ func (rr *randomReader) ReadAt( return } - br := rr.getBucketReader(offset) + //br := rr.getBucketReader(offset) + br := rr.getBucketReader(pid) if br == nil { // TODO: remove stale bucket readers before adding new one. // NOte: it doesn't really prefetch rr.sequentialReadSizeMb that many bytes. It just keeps @@ -358,6 +373,8 @@ func (rr *randomReader) ReadAt( rr.totalReadBytes += uint64(tmp) if br.start == br.end { + logger.Tracef("Closing bucketReader for pid: %d, numSeeks: %d, bytesRead: %d", br.pid, br.numSeeks, br.bytesRead) + err := br.reader.Close() if err != nil { logger.Tracef("Closing bucketReader error: %v", err) @@ -483,6 +500,7 @@ func (rr *randomReader) Object() (o *gcs.MinObject) { func (rr *randomReader) Destroy() { for _, br := range rr.bucketReaders { if br.reader != nil { + logger.Tracef("Closing bucketReader for pid: %d, numSeeks: %d, bytesRead: %d", br.pid, br.numSeeks, br.bytesRead) err := br.reader.Close() br.reader = nil if err != nil { From d2f3dc2fc9dd4d6a14a6518454e38b20a53fe806 Mon Sep 17 00:00:00 2001 From: "Wenlei (Frank) He" Date: Thu, 19 Dec 2024 22:04:26 +0000 Subject: [PATCH 08/14] add bucket reader --- internal/gcsx/random_reader.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/gcsx/random_reader.go b/internal/gcsx/random_reader.go index 25ee7123a1..9936d584c6 100644 --- a/internal/gcsx/random_reader.go +++ b/internal/gcsx/random_reader.go @@ -355,7 +355,7 @@ func (rr *randomReader) ReadAt( // TODO: remove stale bucket readers before adding new one. // NOte: it doesn't really prefetch rr.sequentialReadSizeMb that many bytes. It just keeps // a open stream that could read up to that many bytes. It only keeps about 6MB in memory. - br, err = rr.newBucketReader(ctx, offset, offset+int64(rr.sequentialReadSizeMb*MB)) + br, err = rr.newBucketReader(ctx, pid, offset, offset+int64(rr.sequentialReadSizeMb*MB)) if err != nil { err = fmt.Errorf("ReadAt: while creating bucket reader: %w", err) return @@ -565,9 +565,10 @@ func (rr *randomReader) readFull( func (rr *randomReader) newBucketReader( ctx context.Context, + pid uint32, start int64, end int64) (*bucketReader, error) { - + logger.Tracef("Adding a bucketReader for pid: %d, offset: %d, sizeMB: %d", pid, start, int((end-start)/MB)) // Begin the read. ctx, _ = context.WithCancel(context.Background()) rc, err := rr.bucket.NewReader( @@ -597,6 +598,7 @@ func (rr *randomReader) newBucketReader( return &bucketReader{ reader: rc, + pid: pid, start: start, end: end, lastUsed: time.Now(), From 0b634d8c1d499e7e5dab66b3a9c278ce04faefca Mon Sep 17 00:00:00 2001 From: "Wenlei (Frank) He" Date: Thu, 19 Dec 2024 22:05:12 +0000 Subject: [PATCH 09/14] add bucket reader --- internal/gcsx/random_reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/gcsx/random_reader.go b/internal/gcsx/random_reader.go index 9936d584c6..201d687565 100644 --- a/internal/gcsx/random_reader.go +++ b/internal/gcsx/random_reader.go @@ -113,7 +113,7 @@ func (br *bucketReader) isWithinRange(offset int64) bool { } func (br *bucketReader) isPidMatched(pid uint32) bool { - return pid > 0 && br.pid == pid + return br.reader != nil && pid > 0 && br.pid == pid } func (br *bucketReader) read(offset int64, p []byte) (int, error) { From c489cab3d1aed884182e1a45fb0f36a496265450 Mon Sep 17 00:00:00 2001 From: "Wenlei (Frank) He" Date: Thu, 19 Dec 2024 22:16:54 +0000 Subject: [PATCH 10/14] add bucket reader --- internal/gcsx/random_reader.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/gcsx/random_reader.go b/internal/gcsx/random_reader.go index 201d687565..2496e52778 100644 --- a/internal/gcsx/random_reader.go +++ b/internal/gcsx/random_reader.go @@ -178,13 +178,14 @@ type randomReader struct { } // func (rr *randomReader) getBucketReader(offset int64) *bucketReader { -func (rr *randomReader) getBucketReader(pid uint32) *bucketReader { +func (rr *randomReader) getBucketReader(pid uint32, offset int64) *bucketReader { for _, br := range rr.bucketReaders { if br.reader == nil { continue } //if br.isWithinRange(offset) { - if br.isPidMatched(pid) { + // Neet to check both pid and offset as a bucket reader can only read a certain range. + if br.isPidMatched(pid) && br.isWithinRange(offset) { return br } } @@ -350,7 +351,7 @@ func (rr *randomReader) ReadAt( } //br := rr.getBucketReader(offset) - br := rr.getBucketReader(pid) + br := rr.getBucketReader(pid, offset) if br == nil { // TODO: remove stale bucket readers before adding new one. // NOte: it doesn't really prefetch rr.sequentialReadSizeMb that many bytes. It just keeps From 20ea3afac749f57274d33fb84a803b706ca9e308 Mon Sep 17 00:00:00 2001 From: "Wenlei (Frank) He" Date: Fri, 20 Dec 2024 00:28:20 +0000 Subject: [PATCH 11/14] add bucket reader --- internal/gcsx/random_reader.go | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/internal/gcsx/random_reader.go b/internal/gcsx/random_reader.go index 2496e52778..57ecf7066a 100644 --- a/internal/gcsx/random_reader.go +++ b/internal/gcsx/random_reader.go @@ -146,7 +146,9 @@ type randomReader struct { reader io.ReadCloser cancel func() - bucketReaders []*bucketReader + bucketReaders []*bucketReader + activeBucketReaders int + totalBucketReadersCreated int // The range of the object that we expect reader to yield, when reader is // non-nil. When reader is nil, limit is the limit of the previous read @@ -178,14 +180,13 @@ type randomReader struct { } // func (rr *randomReader) getBucketReader(offset int64) *bucketReader { -func (rr *randomReader) getBucketReader(pid uint32, offset int64) *bucketReader { +func (rr *randomReader) getBucketReader(pid uint32) *bucketReader { for _, br := range rr.bucketReaders { if br.reader == nil { continue } //if br.isWithinRange(offset) { - // Neet to check both pid and offset as a bucket reader can only read a certain range. - if br.isPidMatched(pid) && br.isWithinRange(offset) { + if br.isPidMatched(pid) { return br } } @@ -351,19 +352,33 @@ func (rr *randomReader) ReadAt( } //br := rr.getBucketReader(offset) - br := rr.getBucketReader(pid, offset) + br := rr.getBucketReader(pid) + if br != nil && !br.isWithinRange(offset) { + logger.Tracef("Closing bucketReader as offset is out of range. pid: %d, numSeeks: %d, bytesReadMB: %d", br.pid, br.numSeeks, br.bytesRead/MB) + err := br.reader.Close() + rr.activeBucketReaders-- + if err != nil { + logger.Tracef("Closing bucketReader error: %v", err) + } + // TODO: remove this bucket reader from the list. + br.reader = nil + br = nil + } + if br == nil { // TODO: remove stale bucket readers before adding new one. // NOte: it doesn't really prefetch rr.sequentialReadSizeMb that many bytes. It just keeps // a open stream that could read up to that many bytes. It only keeps about 6MB in memory. br, err = rr.newBucketReader(ctx, pid, offset, offset+int64(rr.sequentialReadSizeMb*MB)) + rr.activeBucketReaders++ + rr.totalBucketReadersCreated++ if err != nil { err = fmt.Errorf("ReadAt: while creating bucket reader: %w", err) return } rr.bucketReaders = append(rr.bucketReaders, br) - logger.Tracef("Adding a bucketReader, total bucketReaders: %d", len(rr.bucketReaders)) + logger.Tracef("Adding a bucketReader, active bucketReaders: %d, total bucketReaders created: %d", rr.activeBucketReaders, rr.totalBucketReadersCreated) } var tmp int tmp, err = br.read(offset, p) @@ -374,8 +389,8 @@ func (rr *randomReader) ReadAt( rr.totalReadBytes += uint64(tmp) if br.start == br.end { - logger.Tracef("Closing bucketReader for pid: %d, numSeeks: %d, bytesRead: %d", br.pid, br.numSeeks, br.bytesRead) - + logger.Tracef("Closing bucketReader as it is exhausted. pid: %d, numSeeks: %d, bytesReadMB: %d", br.pid, br.numSeeks, br.bytesRead/MB) + rr.activeBucketReaders-- err := br.reader.Close() if err != nil { logger.Tracef("Closing bucketReader error: %v", err) @@ -501,7 +516,7 @@ func (rr *randomReader) Object() (o *gcs.MinObject) { func (rr *randomReader) Destroy() { for _, br := range rr.bucketReaders { if br.reader != nil { - logger.Tracef("Closing bucketReader for pid: %d, numSeeks: %d, bytesRead: %d", br.pid, br.numSeeks, br.bytesRead) + logger.Tracef("Closing bucketReader for pid: %d, numSeeks: %d, bytesReadMB: %d", br.pid, br.numSeeks, br.bytesRead/MB) err := br.reader.Close() br.reader = nil if err != nil { From 0806eb5d2def00293c8f4355f0c36db8d1ee7bb4 Mon Sep 17 00:00:00 2001 From: "Wenlei (Frank) He" Date: Sun, 22 Dec 2024 08:39:02 +0000 Subject: [PATCH 12/14] add per pid GCS reader and clean up --- internal/gcsx/random_reader.go | 579 +++++++++++++++------------------ 1 file changed, 265 insertions(+), 314 deletions(-) diff --git a/internal/gcsx/random_reader.go b/internal/gcsx/random_reader.go index 57ecf7066a..dc9584d804 100644 --- a/internal/gcsx/random_reader.go +++ b/internal/gcsx/random_reader.go @@ -84,85 +84,30 @@ type RandomReader interface { // reads using the given bucket. func NewRandomReader(o *gcs.MinObject, bucket gcs.Bucket, sequentialReadSizeMb int32, fileCacheHandler *file.CacheHandler, cacheFileForRangeRead bool, metricHandle common.MetricHandle) RandomReader { return &randomReader{ - object: o, - bucket: bucket, - start: -1, - limit: -1, - seeks: 0, - totalReadBytes: 0, - sequentialReadSizeMb: sequentialReadSizeMb, + object: o, + bucket: bucket, + totalReadBytes: 0, + sequentialReadSizeMb: sequentialReadSizeMb, + // TODO(wlhe): make it a flag + perPIDReader: true, fileCacheHandler: fileCacheHandler, cacheFileForRangeRead: cacheFileForRangeRead, metricHandle: metricHandle, } } -// bucketReader holds a open stream to a GCS bucket for a given range. -type bucketReader struct { - pid uint32 - reader io.ReadCloser - start int64 - end int64 - numSeeks int64 - bytesRead int64 - lastUsed time.Time -} - -func (br *bucketReader) isWithinRange(offset int64) bool { - return br.reader != nil && offset >= br.start && offset < br.end -} - -func (br *bucketReader) isPidMatched(pid uint32) bool { - return br.reader != nil && pid > 0 && br.pid == pid -} - -func (br *bucketReader) read(offset int64, p []byte) (int, error) { - bytesToSkip := int64(offset - br.start) - skip := make([]byte, bytesToSkip) - if bytesToSkip > 0 { - br.numSeeks++ - } - n, err := io.ReadFull(br.reader, skip) - if err != nil { - return 0, err - } - br.start += int64(n) - br.bytesRead += int64(n) - - // Actually read the data into the buffer. - n, err = io.ReadFull(br.reader, p) - br.start += int64(n) - br.lastUsed = time.Now() - return n, err -} - type randomReader struct { object *gcs.MinObject bucket gcs.Bucket - // If non-nil, an in-flight read request and a function for cancelling it. - // - // INVARIANT: (reader == nil) == (cancel == nil) - reader io.ReadCloser - cancel func() - - bucketReaders []*bucketReader - activeBucketReaders int - totalBucketReadersCreated int - - // The range of the object that we expect reader to yield, when reader is - // non-nil. When reader is nil, limit is the limit of the previous read - // operation, or -1 if there has never been one. - // - // INVARIANT: start <= limit - // INVARIANT: limit < 0 implies reader != nil - // All these properties will be used only in case of GCS reads and not for - // reads from cache. - start int64 - limit int64 - seeks uint64 + // If perPIDReader is not set, then there is only one objectRangeReader. + // Otherwise, there is could be multiple objectRangeReaders, one per PID. + objReader *objectRangeReader + objReaders []*objectRangeReader totalReadBytes uint64 + // perPIDReader is a flag to enable per PID GCSreader. + perPIDReader bool sequentialReadSizeMb int32 // fileCacheHandler is used to get file cache handle and read happens using that. @@ -179,34 +124,9 @@ type randomReader struct { metricHandle common.MetricHandle } -// func (rr *randomReader) getBucketReader(offset int64) *bucketReader { -func (rr *randomReader) getBucketReader(pid uint32) *bucketReader { - for _, br := range rr.bucketReaders { - if br.reader == nil { - continue - } - //if br.isWithinRange(offset) { - if br.isPidMatched(pid) { - return br - } - } - return nil -} - func (rr *randomReader) CheckInvariants() { - // INVARIANT: (reader == nil) == (cancel == nil) - if (rr.reader == nil) != (rr.cancel == nil) { - panic(fmt.Sprintf("Mismatch: %v vs. %v", rr.reader == nil, rr.cancel == nil)) - } - - // INVARIANT: start <= limit - if !(rr.start <= rr.limit) { - panic(fmt.Sprintf("Unexpected range: [%d, %d)", rr.start, rr.limit)) - } - - // INVARIANT: limit < 0 implies reader != nil - if rr.limit < 0 && rr.reader != nil { - panic(fmt.Sprintf("Unexpected non-nil reader with limit == %d", rr.limit)) + if rr.objReader != nil { + rr.objReader.CheckInvariants() } } @@ -319,6 +239,30 @@ func captureFileCacheMetrics(ctx context.Context, metricHandle common.MetricHand metricHandle.FileCacheReadLatency(ctx, float64(readLatency.Microseconds()), []common.MetricAttr{{Key: common.CacheHit, Value: strconv.FormatBool(cacheHit)}}) } +func (rr *randomReader) getObjectRangeReader(ctx context.Context) *objectRangeReader { + if rr.perPIDReader { + readOp := ctx.Value(ReadOp).(*fuseops.ReadFileOp) + pid := int64(readOp.OpContext.Pid) + for _, objReader := range rr.objReaders { + if objReader.pid == pid { + return objReader + } + } + or := newObjectRangeReader(rr.object, rr.bucket, rr.sequentialReadSizeMb, rr.metricHandle) + or.pid = pid + // TODO(wlhe): consider limit the max number of objReaders. + rr.objReaders = append(rr.objReaders, or) + logger.Tracef("Created new %s", or.name()) + return or + } + + if rr.objReader == nil { + rr.objReader = newObjectRangeReader(rr.object, rr.bucket, rr.sequentialReadSizeMb, rr.metricHandle) + logger.Tracef("Created new %s", rr.objReader.name()) + } + return rr.objReader +} + func (rr *randomReader) ReadAt( ctx context.Context, p []byte, @@ -342,8 +286,6 @@ func (rr *randomReader) ReadAt( if cacheHit || n == len(p) || (n < len(p) && uint64(offset)+uint64(n) == rr.object.Size) { return } - readOp := ctx.Value(ReadOp).(*fuseops.ReadFileOp) - pid := readOp.OpContext.Pid for len(p) > 0 { // Have we blown past the end of the object? if offset >= int64(rr.object.Size) { @@ -351,203 +293,146 @@ func (rr *randomReader) ReadAt( return } - //br := rr.getBucketReader(offset) - br := rr.getBucketReader(pid) - if br != nil && !br.isWithinRange(offset) { - logger.Tracef("Closing bucketReader as offset is out of range. pid: %d, numSeeks: %d, bytesReadMB: %d", br.pid, br.numSeeks, br.bytesRead/MB) - err := br.reader.Close() - rr.activeBucketReaders-- - if err != nil { - logger.Tracef("Closing bucketReader error: %v", err) - } - // TODO: remove this bucket reader from the list. - br.reader = nil - br = nil - } + // Get the proper objectRangeReader to read data from. + reader := rr.getObjectRangeReader(ctx) - if br == nil { - // TODO: remove stale bucket readers before adding new one. - // NOte: it doesn't really prefetch rr.sequentialReadSizeMb that many bytes. It just keeps - // a open stream that could read up to that many bytes. It only keeps about 6MB in memory. - br, err = rr.newBucketReader(ctx, pid, offset, offset+int64(rr.sequentialReadSizeMb*MB)) - rr.activeBucketReaders++ - rr.totalBucketReadersCreated++ - if err != nil { - err = fmt.Errorf("ReadAt: while creating bucket reader: %w", err) - return - } - - rr.bucketReaders = append(rr.bucketReaders, br) - logger.Tracef("Adding a bucketReader, active bucketReaders: %d, total bucketReaders created: %d", rr.activeBucketReaders, rr.totalBucketReadersCreated) - } var tmp int - tmp, err = br.read(offset, p) + tmp, err = reader.readAt(ctx, p, offset) n += tmp p = p[tmp:] offset += int64(tmp) rr.totalReadBytes += uint64(tmp) + } - if br.start == br.end { - logger.Tracef("Closing bucketReader as it is exhausted. pid: %d, numSeeks: %d, bytesReadMB: %d", br.pid, br.numSeeks, br.bytesRead/MB) - rr.activeBucketReaders-- - err := br.reader.Close() - if err != nil { - logger.Tracef("Closing bucketReader error: %v", err) - } - - // TODO: remove this bucket reader from the list. - br.reader = nil - } + return +} - // Handle errors. - switch { - case err == io.EOF || err == io.ErrUnexpectedEOF: - // For a non-empty buffer, ReadFull returns EOF or ErrUnexpectedEOF only - // if the reader peters out early. That's fine, but it means we should - // have hit the limit above. - if rr.reader != nil { - err = fmt.Errorf("reader returned %d too few bytes", rr.limit-rr.start) - return - } +func (rr *randomReader) Object() (o *gcs.MinObject) { + o = rr.object + return +} - err = nil +func (rr *randomReader) Destroy() { + // Close out all the objectRangeReaders. + for _, or := range rr.objReaders { + or.destroy() + } + if rr.objReader != nil { + rr.objReader.destroy() + } - case err != nil: - // Propagate other errors. - err = fmt.Errorf("read error: %w", err) - return + if rr.fileCacheHandle != nil { + logger.Tracef("Closing cacheHandle:%p for object: %s:/%s", rr.fileCacheHandle, rr.bucket.Name(), rr.object.Name) + err := rr.fileCacheHandle.Close() + if err != nil { + logger.Warnf("rr.Destroy(): while closing cacheFileHandle: %v", err) } - // When the offset is AFTER the reader position, try to seek forward, within reason. - // This happens when the kernel page cache serves some data. It's very common for - // concurrent reads, often by only a few 128kB fuse read requests. The aim is to - // re-use GCS connection and avoid throwing away already read data. - // For parallel sequential reads to a single file, not throwing away the connections - // is a 15-20x improvement in throughput: 150-200 MB/s instead of 10 MB/s. - /* - if rr.reader != nil && rr.start < offset && offset-rr.start < maxReadSize { - bytesToSkip := int64(offset - rr.start) - p := make([]byte, bytesToSkip) - n, _ := io.ReadFull(rr.reader, p) - rr.start += int64(n) - } - - - // If we have an existing reader but it's positioned at the wrong place, - // clean it up and throw it away. - if rr.reader != nil && rr.start != offset { - rr.reader.Close() - rr.reader = nil - rr.cancel = nil - rr.seeks++ - } - - // If we don't have a reader, start a read operation. - if rr.reader == nil { - err = rr.startRead(ctx, offset, int64(len(p))) - if err != nil { - err = fmt.Errorf("startRead: %w", err) - return - } - } + rr.fileCacheHandle = nil + } +} - // Now we have a reader positioned at the correct place. Consume as much from - // it as possible. - var tmp int - tmp, err = rr.readFull(ctx, p) +type objectRangeReader struct { + object *gcs.MinObject + bucket gcs.Bucket - n += tmp - p = p[tmp:] - rr.start += int64(tmp) - offset += int64(tmp) - rr.totalReadBytes += uint64(tmp) + // If non-nil, an in-flight read request and a function for cancelling it. + // + // INVARIANT: (reader == nil) == (cancel == nil) + reader io.ReadCloser + cancel func() - // Sanity check. - if rr.start > rr.limit { - err = fmt.Errorf("reader returned %d too many bytes", rr.start-rr.limit) + // Optional. Only used when per PID reader is enabled. + pid int64 - // Don't attempt to reuse the reader when it's behaving wackily. - rr.reader.Close() - rr.reader = nil - rr.cancel = nil - rr.start = -1 - rr.limit = -1 + // The range of the object that we expect reader to yield, when reader is + // non-nil. When reader is nil, limit is the limit of the previous read + // operation, or -1 if there has never been one. + // + // INVARIANT: start <= limit + // INVARIANT: limit < 0 implies reader != nil + // All these properties will be used only in case of GCS reads and not for + // reads from cache. + start int64 + limit int64 + seeks uint64 + // Number of bytes read from the current GCS reader. + readBytes uint64 - return - } + // Total number of bytes read from GCS of all the GCS readers ever created for this object range reader. + totalReadBytes uint64 - // Are we finished with this reader now? - if rr.start == rr.limit { - rr.reader.Close() - rr.reader = nil - rr.cancel = nil - } + sequentialReadSizeMb int32 - // Handle errors. - switch { - case err == io.EOF || err == io.ErrUnexpectedEOF: - // For a non-empty buffer, ReadFull returns EOF or ErrUnexpectedEOF only - // if the reader peters out early. That's fine, but it means we should - // have hit the limit above. - if rr.reader != nil { - err = fmt.Errorf("reader returned %d too few bytes", rr.limit-rr.start) - return - } - - err = nil - - case err != nil: - // Propagate other errors. - err = fmt.Errorf("readFull: %w", err) - return - } + metricHandle common.MetricHandle +} - */ +func (or *objectRangeReader) name() string { + if or.pid != -1 { + return fmt.Sprintf("objectRangeReader (pid=%d)", or.pid) } - - return + return fmt.Sprintf("objectRangeReader") } -func (rr *randomReader) Object() (o *gcs.MinObject) { - o = rr.object - return +// newObjectRangeReader create a object range reader for the supplied object record that +// reads using the given bucket. +func newObjectRangeReader(o *gcs.MinObject, bucket gcs.Bucket, sequentialReadSizeMb int32, metricHandle common.MetricHandle) *objectRangeReader { + return &objectRangeReader{ + object: o, + bucket: bucket, + pid: -1, + start: -1, + limit: -1, + readBytes: 0, + seeks: 0, + totalReadBytes: 0, + sequentialReadSizeMb: sequentialReadSizeMb, + metricHandle: metricHandle, + } } -func (rr *randomReader) Destroy() { - for _, br := range rr.bucketReaders { - if br.reader != nil { - logger.Tracef("Closing bucketReader for pid: %d, numSeeks: %d, bytesReadMB: %d", br.pid, br.numSeeks, br.bytesRead/MB) - err := br.reader.Close() - br.reader = nil - if err != nil { - logger.Warnf("rr.Destroy(): while closing bucket reader: %v", err) - } - } +func (or *objectRangeReader) CheckInvariants() { + // INVARIANT: (reader == nil) == (cancel == nil) + if (or.reader == nil) != (or.cancel == nil) { + panic(fmt.Sprintf("Mismatch for %s: %v vs. %v", or.name(), or.reader == nil, or.cancel == nil)) } - // Close out the reader, if we have one. - if rr.reader != nil { - err := rr.reader.Close() - rr.reader = nil - rr.cancel = nil - if err != nil { - logger.Warnf("rr.Destroy(): while closing reader: %v", err) - } + + // INVARIANT: start <= limit + if !(or.start <= or.limit) { + panic(fmt.Sprintf("Unexpected range for : [%d, %d)", or.name(), or.start, or.limit)) } - if rr.fileCacheHandle != nil { - logger.Tracef("Closing cacheHandle:%p for object: %s:/%s", rr.fileCacheHandle, rr.bucket.Name(), rr.object.Name) - err := rr.fileCacheHandle.Close() + // INVARIANT: limit < 0 implies reader != nil + if or.limit < 0 && or.reader != nil { + panic(fmt.Sprintf("Unexpected non-nil reader with limit == %d", or.limit)) + } +} + +func (or *objectRangeReader) readBytesMB() float32 { + return float32(or.readBytes) / MB +} + +func (or *objectRangeReader) totalReadBytesMB() float32 { + return float32(or.totalReadBytes) / MB +} + +func (or *objectRangeReader) destroy() { + if or.reader != nil { + logger.Tracef("%s closed the GCS reader, seeks: %d, readBytesMB: %.2f", or.name(), or.seeks, or.readBytesMB()) + logger.Tracef("Destroying %s, seeks: %d, totalReadBytesMB: %.2f", or.name(), or.seeks, or.totalReadBytesMB()) + err := or.reader.Close() + or.reader = nil if err != nil { - logger.Warnf("rr.Destroy(): while closing cacheFileHandle: %v", err) + logger.Warnf("%d dstroy error: %v", or.name(), err) } - rr.fileCacheHandle = nil + or.cancel = nil } } // Like io.ReadFull, but deals with the cancellation issues. // // REQUIRES: rr.reader != nil -func (rr *randomReader) readFull( +func (or *objectRangeReader) readFull( ctx context.Context, p []byte) (n int, err error) { // Start a goroutine that will cancel the read operation we block on below if @@ -568,73 +453,136 @@ func (rr *randomReader) readFull( return default: - rr.cancel() + or.cancel() } } }() // Call through. - n, err = io.ReadFull(rr.reader, p) + n, err = io.ReadFull(or.reader, p) return } -func (rr *randomReader) newBucketReader( +func (or *objectRangeReader) readAt( ctx context.Context, - pid uint32, - start int64, - end int64) (*bucketReader, error) { - logger.Tracef("Adding a bucketReader for pid: %d, offset: %d, sizeMB: %d", pid, start, int((end-start)/MB)) - // Begin the read. - ctx, _ = context.WithCancel(context.Background()) - rc, err := rr.bucket.NewReader( - ctx, - &gcs.ReadObjectRequest{ - Name: rr.object.Name, - Generation: rr.object.Generation, - Range: &gcs.ByteRange{ - Start: uint64(start), - Limit: uint64(end), - }, - ReadCompressed: rr.object.HasContentEncodingGzip(), - }) + p []byte, + offset int64) (n int, err error) { + + // When the offset is AFTER the reader position, try to seek forward, within reason. + // This happens when the kernel page cache serves some data. It's very common for + // concurrent reads, often by only a few 128kB fuse read requests. The aim is to + // re-use GCS connection and avoid throwing away already read data. + // For parallel sequential reads to a single file, not throwing away the connections + // is a 15-20x improvement in throughput: 150-200 MB/s instead of 10 MB/s. + + logger.Tracef("%s start: %d,limit: %d, offset: %d", or.name(), or.start, or.limit, offset) + + //if or.reader != nil && or.start < offset && offset < or.limit { + if or.reader != nil && or.start < offset && offset-or.start < maxReadSize { + bytesToSkip := int64(offset - or.start) + skipBuffer := make([]byte, bytesToSkip) + bytesRead, _ := io.ReadFull(or.reader, skipBuffer) + or.start += int64(bytesRead) + logger.Tracef("%s seek forward bytesMB: %.2f, start: %d", or.name(), float32(bytesRead)/MB, or.start) + } - // If a file handle is open locally, but the corresponding object doesn't exist - // in GCS, it indicates a file clobbering scenario. This likely occurred because: - // - The file was deleted in GCS while a local handle was still open. - // - The file content was modified leading to different generation number. - var notFoundError *gcs.NotFoundError - if errors.As(err, ¬FoundError) { - return nil, &gcsfuse_errors.FileClobberedError{ - Err: fmt.Errorf("NewReader: %w", err), + // If we have an existing reader but it's positioned at the wrong place, + // clean it up and throw it away. + if or.reader != nil && or.start != offset { + or.reader.Close() + or.reader = nil + or.cancel = nil + // TODO(wlhe): remove this experiment. + //or.seeks++ + if or.start > offset { + logger.Tracef("%s closed the GCS reader due to seeking back, seeks: %d, readBytesMB: %.2f", or.name(), or.seeks, or.readBytesMB()) + } else if offset >= or.limit { + logger.Tracef("%s closed the GCS reader due to seeking too far forward, seeks: %d, readBytesMB: %.2f", or.name(), or.seeks, or.readBytesMB()) + } else { + logger.Tracef("%s closed the GCS reader due to wrong position, seeks: %d, readBytesMB: %.2f", or.name(), or.seeks, or.readBytesMB()) + } + } + + // If we don't have a reader, start a new GCS reader for the given range. + if or.reader == nil { + err = or.newGCSReader(ctx, offset, int64(len(p))) + if err != nil { + err = fmt.Errorf("%s newGCSReader error: %v", or.name(), err) + return } } - common.CaptureGCSReadMetrics(ctx, rr.metricHandle, util.Sequential, end-start) + // Now we have a reader positioned at the correct place. Consume as much from + // it as possible. + n, err = or.readFull(ctx, p) + + or.start += int64(n) + or.readBytes += uint64(n) + or.totalReadBytes += uint64(n) + + // Sanity check. + if or.start > or.limit { + err = fmt.Errorf("%s reader returned %d too many bytes", or.name(), or.start-or.limit) + logger.Tracef("%s closed the GCS reader due to too many bytes returned, seeks: %d, readBytesMB: %.2f", or.name(), or.seeks, or.readBytesMB()) + + // Don't attempt to reuse the reader when it's behaving wackily. + or.reader.Close() + or.reader = nil + or.cancel = nil + or.start = -1 + or.limit = -1 + or.readBytes = 0 + + return + } + + // Are we finished with this reader now? + if or.start == or.limit { + logger.Tracef("%s closed the GCS reader due to byte limit reached: %d, readBytesMB: %.2f", or.name(), or.seeks, or.readBytesMB()) - return &bucketReader{ - reader: rc, - pid: pid, - start: start, - end: end, - lastUsed: time.Now(), - }, nil + or.reader.Close() + or.reader = nil + or.cancel = nil + } + + // Handle errors. + switch { + case err == io.EOF || err == io.ErrUnexpectedEOF: + // For a non-empty buffer, ReadFull returns EOF or ErrUnexpectedEOF only + // if the reader peters out early. That's fine, but it means we should + // have hit the limit above. + if or.reader != nil { + err = fmt.Errorf("%s reader returned %d too few bytes", or.name(), or.limit-or.start) + return + } + + err = nil + + case err != nil: + // Propagate other errors. + err = fmt.Errorf("%sreadFull: %v", or.name(), err) + return + } + return } -// Ensure that rr.reader is set up for a range for which [start, start+size) is +// Create a new GCS reader for the given range for the objectRangeReader. +// +// Ensure that or.reader is set up for a range for which [start, start+size) is // a prefix. Irrespective of the size requested, we try to fetch more data // from GCS defined by sequentialReadSizeMb flag to serve future read requests. -func (rr *randomReader) startRead( +func (or *objectRangeReader) newGCSReader( ctx context.Context, start int64, size int64) (err error) { // Make sure start and size are legal. - if start < 0 || uint64(start) > rr.object.Size || size < 0 { + if start < 0 || uint64(start) > or.object.Size || size < 0 { err = fmt.Errorf( "range [%d, %d) is illegal for %d-byte object", start, start+size, - rr.object.Size) + or.object.Size) return } @@ -649,11 +597,11 @@ func (rr *randomReader) startRead( // But if we notice random read patterns after a minimum number of seeks, // optimise for random reads. Random reads will read data in chunks of // (average read size in bytes rounded up to the next MB). - end := int64(rr.object.Size) + end := int64(or.object.Size) readType := util.Sequential - if rr.seeks >= minSeeksForRandom { + if or.seeks >= minSeeksForRandom { readType = util.Random - averageReadBytes := rr.totalReadBytes / rr.seeks + averageReadBytes := or.totalReadBytes / or.seeks if averageReadBytes < maxReadSize { randomReadSize := int64(((averageReadBytes / MB) + 1) * MB) if randomReadSize < minReadSize { @@ -665,29 +613,31 @@ func (rr *randomReader) startRead( end = start + randomReadSize } } - if end > int64(rr.object.Size) { - end = int64(rr.object.Size) + if end > int64(or.object.Size) { + end = int64(or.object.Size) } // To avoid overloading GCS and to have reasonable latencies, we will only // fetch data of max size defined by sequentialReadSizeMb. - maxSizeToReadFromGCS := int64(rr.sequentialReadSizeMb * MB) + maxSizeToReadFromGCS := int64(or.sequentialReadSizeMb * MB) if end-start > maxSizeToReadFromGCS { end = start + maxSizeToReadFromGCS } + logger.Tracef("%s creating a new GCS bucket reader, offset: %d, sizeMB: %d", or.name(), start, (end-start)/MB) // Begin the read. + // Use a Background context to keep the GCS stream open. ctx, cancel := context.WithCancel(context.Background()) - rc, err := rr.bucket.NewReader( + rc, err := or.bucket.NewReader( ctx, &gcs.ReadObjectRequest{ - Name: rr.object.Name, - Generation: rr.object.Generation, + Name: or.object.Name, + Generation: or.object.Generation, Range: &gcs.ByteRange{ Start: uint64(start), Limit: uint64(end), }, - ReadCompressed: rr.object.HasContentEncodingGzip(), + ReadCompressed: or.object.HasContentEncodingGzip(), }) // If a file handle is open locally, but the corresponding object doesn't exist @@ -697,23 +647,24 @@ func (rr *randomReader) startRead( var notFoundError *gcs.NotFoundError if errors.As(err, ¬FoundError) { err = &gcsfuse_errors.FileClobberedError{ - Err: fmt.Errorf("NewReader: %w", err), + Err: fmt.Errorf("%s newGCSReader: %w", or.name(), err), } return } if err != nil { - err = fmt.Errorf("NewReader: %w", err) + err = fmt.Errorf("%s newGCSReader: %w", or.name(), err) return } - rr.reader = rc - rr.cancel = cancel - rr.start = start - rr.limit = end + or.reader = rc + or.cancel = cancel + or.start = start + or.limit = end + or.readBytes = 0 requestedDataSize := end - start - common.CaptureGCSReadMetrics(ctx, rr.metricHandle, readType, requestedDataSize) + common.CaptureGCSReadMetrics(ctx, or.metricHandle, readType, requestedDataSize) return } From a7a1f87451d594a3a396af983588b496efe8f0ab Mon Sep 17 00:00:00 2001 From: "Wenlei (Frank) He" Date: Sun, 22 Dec 2024 17:32:51 +0000 Subject: [PATCH 13/14] add per pid GCS reader and clean up --- internal/gcsx/random_reader.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/internal/gcsx/random_reader.go b/internal/gcsx/random_reader.go index dc9584d804..84efac02d1 100644 --- a/internal/gcsx/random_reader.go +++ b/internal/gcsx/random_reader.go @@ -416,10 +416,14 @@ func (or *objectRangeReader) totalReadBytesMB() float32 { return float32(or.totalReadBytes) / MB } +func (or *objectRangeReader) gcsReaderStats() string { + return fmt.Sprintf("readBytesMB: %.2f, totalReadBytesMB: %.2f, seeks: %d", or.readBytesMB(), or.totalReadBytesMB(), or.seeks) +} + func (or *objectRangeReader) destroy() { if or.reader != nil { - logger.Tracef("%s closed the GCS reader, seeks: %d, readBytesMB: %.2f", or.name(), or.seeks, or.readBytesMB()) - logger.Tracef("Destroying %s, seeks: %d, totalReadBytesMB: %.2f", or.name(), or.seeks, or.totalReadBytesMB()) + logger.Tracef("%s closed the GCS reader at final destruction, %s", or.name(), or.gcsReaderStats()) + logger.Tracef("Destroying %s, totalReadBytesMB: %.2f, seeks: %d", or.name(), or.totalReadBytesMB(), or.seeks) err := or.reader.Close() or.reader = nil if err != nil { @@ -476,7 +480,7 @@ func (or *objectRangeReader) readAt( // For parallel sequential reads to a single file, not throwing away the connections // is a 15-20x improvement in throughput: 150-200 MB/s instead of 10 MB/s. - logger.Tracef("%s start: %d,limit: %d, offset: %d", or.name(), or.start, or.limit, offset) + logger.Tracef("%s readAt, start: %d, limit: %d, offset: %d", or.name(), or.start, or.limit, offset) //if or.reader != nil && or.start < offset && offset < or.limit { if or.reader != nil && or.start < offset && offset-or.start < maxReadSize { @@ -496,11 +500,13 @@ func (or *objectRangeReader) readAt( // TODO(wlhe): remove this experiment. //or.seeks++ if or.start > offset { - logger.Tracef("%s closed the GCS reader due to seeking back, seeks: %d, readBytesMB: %.2f", or.name(), or.seeks, or.readBytesMB()) + logger.Tracef("%s closed the GCS reader due to seeking back, %s", or.name(), or.gcsReaderStats()) + } else if offset-or.start >= maxReadSize { + logger.Tracef("%s closed the GCS reader due to seeking forward for more than %d MB, %s", or.name(), maxReadSize/MB, or.gcsReaderStats()) } else if offset >= or.limit { - logger.Tracef("%s closed the GCS reader due to seeking too far forward, seeks: %d, readBytesMB: %.2f", or.name(), or.seeks, or.readBytesMB()) + logger.Tracef("%s closed the GCS reader due to seeking beyond the range, %s", or.name(), or.gcsReaderStats()) } else { - logger.Tracef("%s closed the GCS reader due to wrong position, seeks: %d, readBytesMB: %.2f", or.name(), or.seeks, or.readBytesMB()) + logger.Tracef("%s closed the GCS reader due to wrong position, %s", or.name(), or.gcsReaderStats()) } } @@ -524,7 +530,7 @@ func (or *objectRangeReader) readAt( // Sanity check. if or.start > or.limit { err = fmt.Errorf("%s reader returned %d too many bytes", or.name(), or.start-or.limit) - logger.Tracef("%s closed the GCS reader due to too many bytes returned, seeks: %d, readBytesMB: %.2f", or.name(), or.seeks, or.readBytesMB()) + logger.Tracef("%s closed the GCS reader due to too many bytes returned, %s", or.name(), or.gcsReaderStats()) // Don't attempt to reuse the reader when it's behaving wackily. or.reader.Close() @@ -539,7 +545,7 @@ func (or *objectRangeReader) readAt( // Are we finished with this reader now? if or.start == or.limit { - logger.Tracef("%s closed the GCS reader due to byte limit reached: %d, readBytesMB: %.2f", or.name(), or.seeks, or.readBytesMB()) + logger.Tracef("%s closed the GCS reader due to byte limit reached: %d, %s", or.name(), or.limit, or.gcsReaderStats()) or.reader.Close() or.reader = nil From b5a775f0d705e3f646409552797c910613f2a3ae Mon Sep 17 00:00:00 2001 From: "Wenlei (Frank) He" Date: Sun, 22 Dec 2024 19:46:28 +0000 Subject: [PATCH 14/14] add per pid GCS reader and clean up --- internal/gcsx/random_reader.go | 66 ++++++++++++++++++++++------------ 1 file changed, 44 insertions(+), 22 deletions(-) diff --git a/internal/gcsx/random_reader.go b/internal/gcsx/random_reader.go index 84efac02d1..75d61254ca 100644 --- a/internal/gcsx/random_reader.go +++ b/internal/gcsx/random_reader.go @@ -88,11 +88,12 @@ func NewRandomReader(o *gcs.MinObject, bucket gcs.Bucket, sequentialReadSizeMb i bucket: bucket, totalReadBytes: 0, sequentialReadSizeMb: sequentialReadSizeMb, - // TODO(wlhe): make it a flag - perPIDReader: true, - fileCacheHandler: fileCacheHandler, - cacheFileForRangeRead: cacheFileForRangeRead, - metricHandle: metricHandle, + // TODO(wlhe): make them flags + perPIDReader: true, + minSequentialReadSizeMb: 200, + fileCacheHandler: fileCacheHandler, + cacheFileForRangeRead: cacheFileForRangeRead, + metricHandle: metricHandle, } } @@ -110,6 +111,9 @@ type randomReader struct { perPIDReader bool sequentialReadSizeMb int32 + // minSequentialReadSizeMb is the minimum size of sequential read size when creating a GCS reader. + minSequentialReadSizeMb int32 + // fileCacheHandler is used to get file cache handle and read happens using that. // This will be nil if the file cache is disabled. fileCacheHandler *file.CacheHandler @@ -248,7 +252,7 @@ func (rr *randomReader) getObjectRangeReader(ctx context.Context) *objectRangeRe return objReader } } - or := newObjectRangeReader(rr.object, rr.bucket, rr.sequentialReadSizeMb, rr.metricHandle) + or := newObjectRangeReader(rr.object, rr.bucket, rr.sequentialReadSizeMb, rr.minSequentialReadSizeMb, rr.metricHandle) or.pid = pid // TODO(wlhe): consider limit the max number of objReaders. rr.objReaders = append(rr.objReaders, or) @@ -257,7 +261,7 @@ func (rr *randomReader) getObjectRangeReader(ctx context.Context) *objectRangeRe } if rr.objReader == nil { - rr.objReader = newObjectRangeReader(rr.object, rr.bucket, rr.sequentialReadSizeMb, rr.metricHandle) + rr.objReader = newObjectRangeReader(rr.object, rr.bucket, rr.sequentialReadSizeMb, rr.minSequentialReadSizeMb, rr.metricHandle) logger.Tracef("Created new %s", rr.objReader.name()) } return rr.objReader @@ -364,6 +368,9 @@ type objectRangeReader struct { sequentialReadSizeMb int32 + // minSequentialReadSizeMb is the minimum size of sequential read size when creating a GCS reader. + minSequentialReadSizeMb int32 + metricHandle common.MetricHandle } @@ -376,18 +383,19 @@ func (or *objectRangeReader) name() string { // newObjectRangeReader create a object range reader for the supplied object record that // reads using the given bucket. -func newObjectRangeReader(o *gcs.MinObject, bucket gcs.Bucket, sequentialReadSizeMb int32, metricHandle common.MetricHandle) *objectRangeReader { +func newObjectRangeReader(o *gcs.MinObject, bucket gcs.Bucket, sequentialReadSizeMb int32, minSequentialReadSizeMb int32, metricHandle common.MetricHandle) *objectRangeReader { return &objectRangeReader{ - object: o, - bucket: bucket, - pid: -1, - start: -1, - limit: -1, - readBytes: 0, - seeks: 0, - totalReadBytes: 0, - sequentialReadSizeMb: sequentialReadSizeMb, - metricHandle: metricHandle, + object: o, + bucket: bucket, + pid: -1, + start: -1, + limit: -1, + readBytes: 0, + seeks: 0, + totalReadBytes: 0, + sequentialReadSizeMb: sequentialReadSizeMb, + minSequentialReadSizeMb: minSequentialReadSizeMb, + metricHandle: metricHandle, } } @@ -479,10 +487,14 @@ func (or *objectRangeReader) readAt( // re-use GCS connection and avoid throwing away already read data. // For parallel sequential reads to a single file, not throwing away the connections // is a 15-20x improvement in throughput: 150-200 MB/s instead of 10 MB/s. + // + // It doesn't reuse the connection when seeking forward for more than maxReadSize before + // seeking forward is actually done reading by discarding all the bytes between start and offset. + // If the gap is too large, it's likely that it's slower than just creating a new connection. + // at the new offset. logger.Tracef("%s readAt, start: %d, limit: %d, offset: %d", or.name(), or.start, or.limit, offset) - //if or.reader != nil && or.start < offset && offset < or.limit { if or.reader != nil && or.start < offset && offset-or.start < maxReadSize { bytesToSkip := int64(offset - or.start) skipBuffer := make([]byte, bytesToSkip) @@ -497,8 +509,7 @@ func (or *objectRangeReader) readAt( or.reader.Close() or.reader = nil or.cancel = nil - // TODO(wlhe): remove this experiment. - //or.seeks++ + or.seeks++ if or.start > offset { logger.Tracef("%s closed the GCS reader due to seeking back, %s", or.name(), or.gcsReaderStats()) } else if offset-or.start >= maxReadSize { @@ -605,17 +616,28 @@ func (or *objectRangeReader) newGCSReader( // (average read size in bytes rounded up to the next MB). end := int64(or.object.Size) readType := util.Sequential + logger.Tracef("%s calculating the read size for the new GCS reader, totalReadBytesMB: %.2f, seeks: %d", or.name(), or.totalReadBytesMB(), or.seeks) + if or.seeks >= minSeeksForRandom { readType = util.Random averageReadBytes := or.totalReadBytes / or.seeks + logger.Tracef("%s calculating the read size for the new GCS reader, averageReadBytesMB: %d", or.name(), averageReadBytes/MB) + if averageReadBytes < maxReadSize { randomReadSize := int64(((averageReadBytes / MB) + 1) * MB) + logger.Tracef("%s calculating the read size for the new GCS reader, randomReadSizeMB: %d, minReadSizeMB: %d, maxReadSizeMB: %d", or.name(), randomReadSize/MB, minReadSize/MB, maxReadSize/MB) + if randomReadSize < minReadSize { randomReadSize = minReadSize } if randomReadSize > maxReadSize { randomReadSize = maxReadSize } + + if randomReadSize < int64(or.minSequentialReadSizeMb*MB) { + logger.Tracef("%s overriding the read size for the new GCS reader, original sizeMB: %d, new sizeMB: %d", or.name(), randomReadSize/MB, or.minSequentialReadSizeMb) + randomReadSize = int64(or.minSequentialReadSizeMb * MB) + } end = start + randomReadSize } } @@ -629,7 +651,7 @@ func (or *objectRangeReader) newGCSReader( if end-start > maxSizeToReadFromGCS { end = start + maxSizeToReadFromGCS } - logger.Tracef("%s creating a new GCS bucket reader, offset: %d, sizeMB: %d", or.name(), start, (end-start)/MB) + logger.Tracef("%s creating a new GCS reader, offset: %d, sizeMB: %d", or.name(), start, (end-start)/MB) // Begin the read. // Use a Background context to keep the GCS stream open.