Skip to content

Commit

Permalink
Merge pull request #96 from danielblando/nopSeeker
Browse files Browse the repository at this point in the history
Add seeker option for nopCloserWithObjectSize
  • Loading branch information
yeya24 authored Jan 16, 2024
2 parents 61cfed8 + ab3da4d commit 6ecabdd
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#85](https://github.com/thanos-io/objstore/pull/85) S3: Allow checksum algorithm to be configured
- [#92](https://github.com/thanos-io/objstore/pull/92) GCS: Allow using a gRPC client.
- [#94](https://github.com/thanos-io/objstore/pull/94) Allow timingReadCloser to be seeker
- [#96](https://github.com/thanos-io/objstore/pull/96) Allow nopCloserWithObjectSize to be seeker

### Changed
- [#38](https://github.com/thanos-io/objstore/pull/38) *: Upgrade minio-go version to `v7.0.45`.
Expand Down
23 changes: 22 additions & 1 deletion objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,19 @@ func NopCloserWithSize(r io.Reader) io.ReadCloser {
return nopCloserWithObjectSize{r}
}

type nopSeekerCloserWithObjectSize struct{ io.Reader }

func (nopSeekerCloserWithObjectSize) Close() error { return nil }
func (n nopSeekerCloserWithObjectSize) ObjectSize() (int64, error) { return TryToGetSize(n.Reader) }

func (n nopSeekerCloserWithObjectSize) Seek(offset int64, whence int) (int64, error) {
return n.Reader.(io.Seeker).Seek(offset, whence)
}

func nopSeekerCloserWithSize(r io.Reader) io.ReadSeekCloser {
return nopSeekerCloserWithObjectSize{r}
}

// UploadDir uploads all files in srcdir to the bucket with into a top-level directory
// named dstdir. It is a caller responsibility to clean partial upload in case of failure.
func UploadDir(ctx context.Context, logger log.Logger, bkt Bucket, srcdir, dstdir string, options ...UploadOption) error {
Expand Down Expand Up @@ -595,8 +608,16 @@ func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) err
const op = OpUpload
b.ops.WithLabelValues(op).Inc()

_, ok := r.(io.Seeker)
var nopR io.ReadCloser
if ok {
nopR = nopSeekerCloserWithSize(r)
} else {
nopR = NopCloserWithSize(r)
}

trc := newTimingReadCloser(
NopCloserWithSize(r),
nopR,
op,
b.opsDuration,
b.opsFailures,
Expand Down
36 changes: 23 additions & 13 deletions objstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,29 @@ func TestTimingTracingReader(t *testing.T) {
testutil.Equals(t, int64(11), size)
}

func TestUploadKeepsSeekerObj(t *testing.T) {
r := prometheus.NewRegistry()
m := seekerTestBucket{
Bucket: WrapWithMetrics(NewInMemBucket(), r, ""),
}

testutil.Ok(t, m.Upload(context.Background(), "dir/obj1", bytes.NewReader([]byte("1"))))
}

// seekerBucket implements Bucket and checks if io.Reader is still seekable.
type seekerTestBucket struct {
Bucket
}

func (b seekerTestBucket) Upload(ctx context.Context, name string, r io.Reader) error {
_, ok := r.(io.Seeker)
if !ok {
return errors.New("Reader was supposed to be seekable")
}

return nil
}

func TestTimingTracingReaderSeeker(t *testing.T) {
m := WrapWithMetrics(NewInMemBucket(), nil, "")
r := bytes.NewReader([]byte("hello world"))
Expand Down Expand Up @@ -293,16 +316,3 @@ func (b unreliableBucket) Get(ctx context.Context, name string) (io.ReadCloser,
}
return b.Bucket.Get(ctx, name)
}

type nopSeekerCloserWithObjectSize struct{ io.Reader }

func (n nopSeekerCloserWithObjectSize) Seek(offset int64, whence int) (int64, error) {
return 0, nil
}

func (nopSeekerCloserWithObjectSize) Close() error { return nil }
func (n nopSeekerCloserWithObjectSize) ObjectSize() (int64, error) { return TryToGetSize(n.Reader) }

func nopSeekerCloserWithSize(r io.Reader) io.ReadSeekCloser {
return nopSeekerCloserWithObjectSize{r}
}

0 comments on commit 6ecabdd

Please sign in to comment.