Skip to content

Commit

Permalink
Merge pull request #145 from ashwanthgoli/get-include-size
Browse files Browse the repository at this point in the history
Include content length in the response of Get and GetRange
  • Loading branch information
fpetkovski authored Oct 15, 2024
2 parents f90c89a + dfed39a commit 5f04b8b
Show file tree
Hide file tree
Showing 14 changed files with 174 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

## Unreleased
- [#38](https://github.com/thanos-io/objstore/pull/38) GCS: Upgrade cloud.google.com/go/storage version to `v1.43.0`.
- [#145](https://github.com/thanos-io/objstore/pull/145) Include content length in the response of Get and GetRange.

### Fixed
- [#117](https://github.com/thanos-io/objstore/pull/117) Metrics: Fix `objstore_bucket_operation_failures_total` incorrectly incremented if context is cancelled while reading object contents.
Expand Down
32 changes: 27 additions & 5 deletions inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,12 @@ func (b *InMemBucket) Get(_ context.Context, name string) (io.ReadCloser, error)
return nil, errNotFound
}

return io.NopCloser(bytes.NewReader(file)), nil
return ObjectSizerReadCloser{
ReadCloser: io.NopCloser(bytes.NewReader(file)),
Size: func() (int64, error) {
return int64(len(file)), nil
},
}, nil
}

// GetRange returns a new range reader for the given object name and range.
Expand All @@ -136,23 +141,40 @@ func (b *InMemBucket) GetRange(_ context.Context, name string, off, length int64
}

if int64(len(file)) < off {
return io.NopCloser(bytes.NewReader(nil)), nil
return ObjectSizerReadCloser{
ReadCloser: io.NopCloser(bytes.NewReader(nil)),
Size: func() (int64, error) { return 0, nil },
}, nil
}

if length == -1 {
return io.NopCloser(bytes.NewReader(file[off:])), nil
return ObjectSizerReadCloser{
ReadCloser: io.NopCloser(bytes.NewReader(file[off:])),
Size: func() (int64, error) {
return int64(len(file[off:])), nil
},
}, nil
}

if length <= 0 {
return io.NopCloser(bytes.NewReader(nil)), errors.New("length cannot be smaller or equal 0")
// wrap with ObjectSizerReadCloser to return 0 size.
return ObjectSizerReadCloser{
ReadCloser: io.NopCloser(bytes.NewReader(nil)),
Size: func() (int64, error) { return 0, nil },
}, errors.New("length cannot be smaller or equal 0")
}

if int64(len(file)) <= off+length {
// Just return maximum of what we have.
length = int64(len(file)) - off
}

return io.NopCloser(bytes.NewReader(file[off : off+length])), nil
return ObjectSizerReadCloser{
ReadCloser: io.NopCloser(bytes.NewReader(file[off : off+length])),
Size: func() (int64, error) {
return length, nil
},
}, nil
}

// Exists checks if the given directory exists in memory.
Expand Down
14 changes: 14 additions & 0 deletions objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,3 +829,17 @@ func (t *timingReaderWriterTo) WriteTo(w io.Writer) (n int64, err error) {
t.timingReader.updateMetrics(int(n), err)
return n, err
}

type ObjectSizerReadCloser struct {
io.ReadCloser
Size func() (int64, error)
}

// ObjectSize implement ObjectSizer.
func (o ObjectSizerReadCloser) ObjectSize() (int64, error) {
if o.Size == nil {
return 0, errors.New("unknown size")
}

return o.Size()
}
8 changes: 7 additions & 1 deletion providers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,13 @@ func (b *Bucket) getBlobReader(ctx context.Context, name string, httpRange blob.
return nil, errors.Wrapf(err, "cannot download blob, address: %s", blobClient.URL())
}
retryOpts := azblob.RetryReaderOptions{MaxRetries: int32(b.readerMaxRetries)}
return resp.NewRetryReader(ctx, &retryOpts), nil

return objstore.ObjectSizerReadCloser{
ReadCloser: resp.NewRetryReader(ctx, &retryOpts),
Size: func() (int64, error) {
return *resp.ContentLength, nil
},
}, nil
}

// Get returns a reader for the given object name.
Expand Down
7 changes: 6 additions & 1 deletion providers/bos/bos.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,12 @@ func (b *Bucket) getRange(_ context.Context, bucketName, objectKey string, off,
return nil, err
}

return obj.Body, nil
return objstore.ObjectSizerReadCloser{
ReadCloser: obj.Body,
Size: func() (int64, error) {
return obj.ContentLength, nil
},
}, err
}

func configFromEnv() Config {
Expand Down
18 changes: 6 additions & 12 deletions providers/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,18 +320,12 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (
return nil, err
}
// Add size info into reader to pass it to Upload function.
r := objectSizerReadCloser{ReadCloser: resp.Body, size: resp.ContentLength}
return r, nil
}

type objectSizerReadCloser struct {
io.ReadCloser
size int64
}

// ObjectSize implement objstore.ObjectSizer.
func (o objectSizerReadCloser) ObjectSize() (int64, error) {
return o.size, nil
return objstore.ObjectSizerReadCloser{
ReadCloser: resp.Body,
Size: func() (int64, error) {
return resp.ContentLength, nil
},
}, nil
}

// Get returns a reader for the given object name.
Expand Down
29 changes: 24 additions & 5 deletions providers/filesystem/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,12 @@ func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (
return nil, errors.New("object name is empty")
}

file := filepath.Join(b.rootDir, name)
if _, err := os.Stat(file); err != nil {
var (
file = filepath.Join(b.rootDir, name)
stat os.FileInfo
err error
)
if stat, err = os.Stat(file); err != nil {
return nil, errors.Wrapf(err, "stat %s", file)
}

Expand All @@ -160,18 +164,33 @@ func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (
return nil, err
}

var newOffset int64
if off > 0 {
_, err := f.Seek(off, 0)
newOffset, err = f.Seek(off, 0)
if err != nil {
return nil, errors.Wrapf(err, "seek %v", off)
}
}

size := stat.Size() - newOffset
if length == -1 {
return f, nil
return objstore.ObjectSizerReadCloser{
ReadCloser: f,
Size: func() (int64, error) {
return size, nil
},
}, nil
}

return &rangeReaderCloser{Reader: io.LimitReader(f, length), f: f}, nil
return objstore.ObjectSizerReadCloser{
ReadCloser: &rangeReaderCloser{
Reader: io.LimitReader(f, length),
f: f,
},
Size: func() (int64, error) {
return min(length, size), nil
},
}, nil
}

// Exists checks if the given directory exists in memory.
Expand Down
25 changes: 23 additions & 2 deletions providers/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,33 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt

// Get returns a reader for the given object name.
func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.bkt.Object(name).NewReader(ctx)
r, err := b.bkt.Object(name).NewReader(ctx)
if err != nil {
return r, err
}

return objstore.ObjectSizerReadCloser{
ReadCloser: r,
Size: func() (int64, error) {
return r.Attrs.Size, nil
},
}, nil
}

// GetRange returns a new range reader for the given object name and range.
func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return b.bkt.Object(name).NewRangeReader(ctx, off, length)
r, err := b.bkt.Object(name).NewRangeReader(ctx, off, length)
if err != nil {
return r, err
}

sz := r.Remain()
return objstore.ObjectSizerReadCloser{
ReadCloser: r,
Size: func() (int64, error) {
return sz, nil
},
}, nil
}

// Attributes returns information about the specified object.
Expand Down
7 changes: 6 additions & 1 deletion providers/obs/obs.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,12 @@ func (b *Bucket) getRange(_ context.Context, name string, off, length int64) (io
if err != nil {
return nil, errors.Wrap(err, "failed to get object")
}
return output.Body, nil
return objstore.ObjectSizerReadCloser{
ReadCloser: output.Body,
Size: func() (int64, error) {
return output.ContentLength, nil
},
}, nil
}

// Exists checks if the given object exists in the bucket.
Expand Down
13 changes: 11 additions & 2 deletions providers/oci/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,12 @@ func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
if err != nil {
return nil, err
}
return response.Content, nil
return objstore.ObjectSizerReadCloser{
ReadCloser: response.Content,
Size: func() (int64, error) {
return *response.ContentLength, nil
},
}, nil
}

// GetRange returns a new range reader for the given object name and range.
Expand Down Expand Up @@ -164,7 +169,11 @@ func (b *Bucket) GetRange(ctx context.Context, name string, offset, length int64
if err != nil {
return nil, err
}
return response.Content, nil
return objstore.ObjectSizerReadCloser{ReadCloser: response.Content,
Size: func() (int64, error) {
return *response.ContentLength, nil
},
}, nil
}

// Upload the contents of the reader as an object into the bucket.
Expand Down
15 changes: 13 additions & 2 deletions providers/oss/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"
"time"

"github.com/aliyun/aliyun-oss-go-sdk/oss"
alioss "github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/go-kit/log"
"github.com/pkg/errors"
Expand Down Expand Up @@ -342,12 +343,22 @@ func (b *Bucket) getRange(_ context.Context, name string, off, length int64) (io
opts = append(opts, opt)
}

resp, err := b.bucket.GetObject(name, opts...)
resp, err := b.bucket.DoGetObject(&oss.GetObjectRequest{ObjectKey: name}, opts)
if err != nil {
return nil, err
}

return resp, nil
size, err := clientutil.ParseContentLength(resp.Response.Headers)
if err == nil {
return objstore.ObjectSizerReadCloser{
ReadCloser: resp.Response,
Size: func() (int64, error) {
return size, nil
},
}, nil
}

return resp.Response, nil
}

// Get returns a reader for the given object name.
Expand Down
12 changes: 11 additions & 1 deletion providers/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,17 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (
return nil, err
}

return r, nil
return objstore.ObjectSizerReadCloser{
ReadCloser: r,
Size: func() (int64, error) {
stat, err := r.Stat()
if err != nil {
return 0, err
}

return stat.Size, nil
},
}, nil
}

// Get returns a reader for the given object name.
Expand Down
6 changes: 5 additions & 1 deletion providers/swift/swift.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,11 @@ func (c *Container) get(name string, headers swift.Headers, checkHash bool) (io.
if err != nil {
return nil, errors.Wrap(err, "open object")
}
return file, err

return objstore.ObjectSizerReadCloser{
ReadCloser: file,
Size: file.Length,
}, nil
}

// Get returns a reader for the given object name.
Expand Down
20 changes: 20 additions & 0 deletions testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ func AcceptanceTest(t *testing.T, bkt Bucket) {
rc1, err := bkt.Get(ctx, "id1/obj_1.some")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, rc1.Close()) }()

sz, err := TryToGetSize(rc1)
testutil.Ok(t, err)
testutil.Equals(t, int64(11), sz, "expected size to be equal to 11")

content, err := io.ReadAll(rc1)
testutil.Ok(t, err)
testutil.Equals(t, "@test-data@", string(content))
Expand All @@ -118,6 +123,11 @@ func AcceptanceTest(t *testing.T, bkt Bucket) {
rc2, err := bkt.GetRange(ctx, "id1/obj_1.some", 1, 3)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, rc2.Close()) }()

sz, err = TryToGetSize(rc2)
testutil.Ok(t, err)
testutil.Equals(t, int64(3), sz, "expected size to be equal to 3")

content, err = io.ReadAll(rc2)
testutil.Ok(t, err)
testutil.Equals(t, "tes", string(content))
Expand All @@ -126,6 +136,11 @@ func AcceptanceTest(t *testing.T, bkt Bucket) {
rcUnspecifiedLen, err := bkt.GetRange(ctx, "id1/obj_1.some", 1, -1)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, rcUnspecifiedLen.Close()) }()

sz, err = TryToGetSize(rcUnspecifiedLen)
testutil.Ok(t, err)
testutil.Equals(t, int64(10), sz, "expected size to be equal to 10")

content, err = io.ReadAll(rcUnspecifiedLen)
testutil.Ok(t, err)
testutil.Equals(t, "test-data@", string(content))
Expand All @@ -141,6 +156,11 @@ func AcceptanceTest(t *testing.T, bkt Bucket) {
rcLength, err := bkt.GetRange(ctx, "id1/obj_1.some", 3, 9999)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, rcLength.Close()) }()

sz, err = TryToGetSize(rcLength)
testutil.Ok(t, err)
testutil.Equals(t, int64(8), sz, "expected size to be equal to 8")

content, err = io.ReadAll(rcLength)
testutil.Ok(t, err)
testutil.Equals(t, "st-data@", string(content))
Expand Down

0 comments on commit 5f04b8b

Please sign in to comment.