Skip to content

Commit

Permalink
Add support for object attributes in Iter call
Browse files Browse the repository at this point in the history
This commit adds support for passing in object attributes to the Iter
callback. This PR makes a breaking change to the API since it adds
a new parameter to the callback. The option is currently supported
for GCS and Filesystem buckets only, but can be extended to other
providers on demand.

An alternative implementation would be to add a new IterWithAttrs
method and implement it only for a subset of providers. This will
avoid a breaking change and will make the support of the feature
more explicit. The downside is we need a new interface for something
that is only an option and that could be supported for all providers
in the future.

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski committed Oct 29, 2024
1 parent cfdd0e5 commit 6aa1b9b
Show file tree
Hide file tree
Showing 19 changed files with 150 additions and 71 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.17.0
github.com/prometheus/common v0.44.0
github.com/stretchr/testify v1.9.0
github.com/tencentyun/cos-go-sdk-v5 v0.7.40
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
Expand Down Expand Up @@ -84,11 +85,11 @@ require (
github.com/mozillazg/go-httpheader v0.2.1 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/rs/xid v1.5.0 // indirect
github.com/sony/gobreaker v0.5.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
Expand All @@ -102,6 +103,7 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

require (
Expand Down
4 changes: 2 additions & 2 deletions inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (b *InMemBucket) Objects() map[string][]byte {

// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *InMemBucket) Iter(_ context.Context, dir string, f func(string) error, options ...IterOption) error {
func (b *InMemBucket) Iter(ctx context.Context, dir string, f func(name string, attrs ObjectAttributes) error, options ...IterOption) error {
unique := map[string]struct{}{}
params := ApplyIterOptions(options...)

Expand Down Expand Up @@ -99,7 +99,7 @@ func (b *InMemBucket) Iter(_ context.Context, dir string, f func(string) error,
})

for _, k := range keys {
if err := f(k); err != nil {
if err := f(k, EmptyObjectAttributes); err != nil {
return err
}
}
Expand Down
24 changes: 18 additions & 6 deletions objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
OpAttributes = "attributes"
)

var EmptyObjectAttributes = ObjectAttributes{}

// Bucket provides read and write access to an object storage bucket.
// NOTE: We assume strong consistency for write-read flow.
type Bucket interface {
Expand Down Expand Up @@ -68,10 +70,12 @@ type InstrumentedBucket interface {

// BucketReader provides read access to an object storage bucket.
type BucketReader interface {
// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
// object name including the prefix of the inspected directory.
// Iter calls f for each entry in the given directory (not recursive.). The first argument to f is the full
// object name including the prefix of the inspected directory. The second argument are the object attributes
// returned by the underlying provider. Attributes can be requested using various IterOption's.
//
// Entries are passed to function in sorted order.
Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error
Iter(ctx context.Context, dir string, f func(name string, attrs ObjectAttributes) error, options ...IterOption) error

// Get returns a reader for the given object name.
Get(ctx context.Context, name string) (io.ReadCloser, error)
Expand Down Expand Up @@ -110,9 +114,17 @@ func WithRecursiveIter(params *IterParams) {
params.Recursive = true
}

// WithUpdatedAt is an option that can be applied to Iter() to
// return the updated time attribute of each object.
// This option is currently supported for the GCS and Filesystem providers.
func WithUpdatedAt(params *IterParams) {
params.WithUpdatedAt = true
}

// IterParams holds the Iter() parameters and is used by objstore clients implementations.
type IterParams struct {
Recursive bool
Recursive bool
WithUpdatedAt bool
}

func ApplyIterOptions(options ...IterOption) IterParams {
Expand Down Expand Up @@ -347,7 +359,7 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, origi
var downloadedFiles []string
var m sync.Mutex

err := bkt.Iter(ctx, src, func(name string) error {
err := bkt.Iter(ctx, src, func(name string, _ ObjectAttributes) error {
g.Go(func() error {
dst := filepath.Join(dst, filepath.Base(name))
if strings.HasSuffix(name, DirDelim) {
Expand Down Expand Up @@ -531,7 +543,7 @@ func (b *metricBucket) ReaderWithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket
return b.WithExpectedErrs(fn)
}

func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error {
func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string, _ ObjectAttributes) error, options ...IterOption) error {
const op = OpIter
b.metrics.ops.WithLabelValues(op).Inc()

Expand Down
6 changes: 3 additions & 3 deletions prefixed_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ func (p *PrefixedBucket) Close() error {
// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
// object name including the prefix of the inspected directory.
// Entries are passed to function in sorted order.
func (p *PrefixedBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error {
func (p *PrefixedBucket) Iter(ctx context.Context, dir string, f func(name string, attrs ObjectAttributes) error, options ...IterOption) error {
pdir := withPrefix(p.prefix, dir)

return p.bkt.Iter(ctx, pdir, func(s string) error {
return f(strings.TrimPrefix(s, p.prefix+DirDelim))
return p.bkt.Iter(ctx, pdir, func(s string, _ ObjectAttributes) error {
return f(strings.TrimPrefix(s, p.prefix+DirDelim), EmptyObjectAttributes)
}, options...)
}

Expand Down
4 changes: 2 additions & 2 deletions prefixed_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func UsesPrefixTest(t *testing.T, bkt Bucket, prefix string) {

testutil.Ok(t, bkt.Upload(context.Background(), strings.Trim(prefix, "/")+"/dir/file1.jpg", strings.NewReader("test-data1")))
seen := []string{}
testutil.Ok(t, pBkt.Iter(context.Background(), "", func(fn string) error {
testutil.Ok(t, pBkt.Iter(context.Background(), "", func(fn string, _ ObjectAttributes) error {
seen = append(seen, fn)
return nil
}, WithRecursiveIter))
Expand All @@ -81,7 +81,7 @@ func UsesPrefixTest(t *testing.T, bkt Bucket, prefix string) {
testutil.Equals(t, expected, seen)

seen = []string{}
testutil.Ok(t, pBkt.Iter(context.Background(), "", func(fn string) error {
testutil.Ok(t, pBkt.Iter(context.Background(), "", func(fn string, _ ObjectAttributes) error {
seen = append(seen, fn)
return nil
}))
Expand Down
8 changes: 4 additions & 4 deletions providers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func NewBucketWithConfig(logger log.Logger, conf Config, component string, wrapR

// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
func (b *Bucket) Iter(ctx context.Context, dir string, f func(name string, _ objstore.ObjectAttributes) error, options ...objstore.IterOption) error {
prefix := dir
if prefix != "" && !strings.HasSuffix(prefix, DirDelim) {
prefix += DirDelim
Expand All @@ -211,7 +211,7 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt
return err
}
for _, blob := range resp.Segment.BlobItems {
if err := f(*blob.Name); err != nil {
if err := f(*blob.Name, objstore.EmptyObjectAttributes); err != nil {
return err
}
}
Expand All @@ -227,12 +227,12 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt
return err
}
for _, blobItem := range resp.Segment.BlobItems {
if err := f(*blobItem.Name); err != nil {
if err := f(*blobItem.Name, objstore.EmptyObjectAttributes); err != nil {
return err
}
}
for _, blobPrefix := range resp.Segment.BlobPrefixes {
if err := f(*blobPrefix.Name); err != nil {
if err := f(*blobPrefix.Name, objstore.EmptyObjectAttributes); err != nil {
return err
}
}
Expand Down
8 changes: 4 additions & 4 deletions providers/bos/bos.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error {

// Iter calls f for each entry in the given directory (not recursive). The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt ...objstore.IterOption) error {
func (b *Bucket) Iter(ctx context.Context, dir string, f func(name string, _ objstore.ObjectAttributes) error, opt ...objstore.IterOption) error {
if dir != "" {
dir = strings.TrimSuffix(dir, objstore.DirDelim) + objstore.DirDelim
}
Expand Down Expand Up @@ -207,13 +207,13 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt

marker = objects.NextMarker
for _, object := range objects.Contents {
if err := f(object.Key); err != nil {
if err := f(object.Key, objstore.EmptyObjectAttributes); err != nil {
return err
}
}

for _, object := range objects.CommonPrefixes {
if err := f(object.Prefix); err != nil {
if err := f(object.Prefix, objstore.EmptyObjectAttributes); err != nil {
return err
}
}
Expand Down Expand Up @@ -353,7 +353,7 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) {
return nil, nil, err
}

if err := b.Iter(context.Background(), "", func(f string) error {
if err := b.Iter(context.Background(), "", func(f string, _ objstore.ObjectAttributes) error {
return errors.Errorf("bucket %s is not empty", c.Bucket)
}); err != nil {
return nil, nil, errors.Wrapf(err, "checking bucket %s", c.Bucket)
Expand Down
6 changes: 3 additions & 3 deletions providers/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (b *Bucket) Delete(ctx context.Context, name string) error {

// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
func (b *Bucket) Iter(ctx context.Context, dir string, f func(name string, attrs objstore.ObjectAttributes) error, options ...objstore.IterOption) error {
if dir != "" {
dir = strings.TrimSuffix(dir, dirDelim) + dirDelim
}
Expand All @@ -290,7 +290,7 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt
if object.key == "" {
continue
}
if err := f(object.key); err != nil {
if err := f(object.key, objstore.EmptyObjectAttributes); err != nil {
return err
}
}
Expand Down Expand Up @@ -493,7 +493,7 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) {
return nil, nil, err
}

if err := b.Iter(context.Background(), "", func(f string) error {
if err := b.Iter(context.Background(), "", func(f string, _ objstore.ObjectAttributes) error {
return errors.Errorf("bucket %s is not empty", c.Bucket)
}); err != nil {
return nil, nil, errors.Wrapf(err, "cos check bucket %s", c.Bucket)
Expand Down
15 changes: 12 additions & 3 deletions providers/filesystem/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewBucket(rootDir string) (*Bucket, error) {

// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
func (b *Bucket) Iter(ctx context.Context, dir string, f func(name string, attrs objstore.ObjectAttributes) error, options ...objstore.IterOption) error {
if ctx.Err() != nil {
return ctx.Err()
}
Expand All @@ -75,7 +75,7 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt
return err
}
for _, file := range files {
name := filepath.Join(dir, file.Name())
name := filepath.Join(absDir, file.Name())

if file.IsDir() {
empty, err := isDirEmpty(filepath.Join(absDir, file.Name()))
Expand All @@ -101,7 +101,16 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt
continue
}
}
if err := f(name); err != nil {

attrs := objstore.EmptyObjectAttributes
if params.WithUpdatedAt {
stat, err := os.Stat(name)
if err != nil {
return errors.Wrapf(err, "unable stat %s", name)
}
attrs.LastModified = stat.ModTime()
}
if err := f(name, attrs); err != nil {
return err
}
}
Expand Down
56 changes: 55 additions & 1 deletion providers/filesystem/filesystem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ package filesystem
import (
"bytes"
"context"
"os"
"strings"
"sync"
"testing"

"github.com/efficientgo/core/testutil"
"github.com/stretchr/testify/require"

"github.com/thanos-io/objstore"
)

func TestDelete_EmptyDirDeletionRaceCondition(t *testing.T) {
Expand Down Expand Up @@ -53,14 +57,64 @@ func TestIter_CancelledContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()

err = b.Iter(ctx, "", func(s string) error {
err = b.Iter(ctx, "", func(s string, _ objstore.ObjectAttributes) error {
return nil
})

testutil.NotOk(t, err)
testutil.Equals(t, context.Canceled, err)
}

func TestIter(t *testing.T) {
dir := t.TempDir()
f, err := os.CreateTemp(dir, "test")
require.NoError(t, err)
defer f.Close()

stat, err := f.Stat()
require.NoError(t, err)

cases := []struct {
name string
opts []objstore.IterOption
expectedAttrs objstore.ObjectAttributes
}{
{
name: "no options",
opts: nil,
expectedAttrs: objstore.EmptyObjectAttributes,
},
{
name: "with updated at",
opts: []objstore.IterOption{
objstore.WithUpdatedAt,
},
expectedAttrs: objstore.ObjectAttributes{
LastModified: stat.ModTime(),
},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
b, err := NewBucket(dir)
testutil.Ok(t, err)

var attrs objstore.ObjectAttributes

ctx := context.Background()
err = b.Iter(ctx, "", func(s string, objAttrs objstore.ObjectAttributes) error {
attrs = objAttrs
return nil
}, tc.opts...)

testutil.Ok(t, err)
testutil.Equals(t, tc.expectedAttrs, attrs)
})

}
}

func TestGet_CancelledContext(t *testing.T) {
b, err := NewBucket(t.TempDir())
testutil.Ok(t, err)
Expand Down
Loading

0 comments on commit 6aa1b9b

Please sign in to comment.