diff --git a/pkg/s3/client.go b/pkg/s3/client.go index e720d41..7a1b1ec 100644 --- a/pkg/s3/client.go +++ b/pkg/s3/client.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "net/url" + "sync/atomic" "github.com/golang/glog" "github.com/minio/minio-go/v7" @@ -170,11 +171,11 @@ func (client *s3Client) removeObjects(bucketName, prefix string) error { // will delete files one by one without file lock func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error { parallelism := 16 - objectsCh := make(chan minio.ObjectInfo, 1) + objectsCh := make(chan minio.ObjectInfo, parallelism) guardCh := make(chan int, parallelism) var listErr error - totalObjects := 0 - removeErrors := 0 + var totalObjects int64 = 0 + var removeErrors int64 = 0 go func() { defer close(objectsCh) @@ -185,7 +186,7 @@ func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error { listErr = object.Err return } - totalObjects++ + atomic.AddInt64(&totalObjects, 1) objectsCh <- object } }() @@ -197,15 +198,15 @@ func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error { for object := range objectsCh { guardCh <- 1 - go func() { - err := client.minio.RemoveObject(client.ctx, bucketName, object.Key, - minio.RemoveObjectOptions{VersionID: object.VersionID}) + go func(obj minio.ObjectInfo) { + err := client.minio.RemoveObject(client.ctx, bucketName, obj.Key, + minio.RemoveObjectOptions{VersionID: obj.VersionID}) if err != nil { - glog.Errorf("Failed to remove object %s, error: %s", object.Key, err) - removeErrors++ + glog.Errorf("Failed to remove object %s, error: %s", obj.Key, err) + atomic.AddInt64(&removeErrors, 1) } <- guardCh - }() + }(object) } for i := 0; i < parallelism; i++ { guardCh <- 1