Skip to content

Commit

Permalink
Track object upload status through a progress bar
Browse files Browse the repository at this point in the history
  • Loading branch information
priyanshikhetwani committed Dec 2, 2024
1 parent 149a34b commit 40d0bb1
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 16 deletions.
1 change: 1 addition & 0 deletions cmd/image/sync/mock/sync_client_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ require (
)

require (
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/atotto/clipboard v0.1.4 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
Expand Down Expand Up @@ -83,6 +85,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/vbauerster/mpb/v8 v8.8.3
go.mongodb.org/mongo-driver v1.16.0 // indirect
go.opentelemetry.io/otel v1.17.0 // indirect
go.opentelemetry.io/otel/metric v1.17.0 // indirect
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ github.com/MakeNowJust/heredoc v1.0.0/go.mod h1:mG5amYoWBHf8vpLOuehzbGGw0EHxpZZ6
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/OpenPeeDeeP/depguard v1.0.1/go.mod h1:xsIw86fROiiwelg+jB2uM9PiKihMMmUx/1V+TNhjQvM=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8=
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/apparentlymart/go-cidr v1.1.0 h1:2mAhrMoF+nhXqxTzSZMUzDHkLjmIHC+Zzn4tdgBZjnU=
Expand Down Expand Up @@ -354,6 +358,8 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC
github.com/valyala/fasthttp v1.2.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/quicktemplate v1.2.0/go.mod h1:EH+4AkTd43SvgIbQHYu59/cJyxDoOVRUAfrukLPuGJ4=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/vbauerster/mpb/v8 v8.8.3 h1:dTOByGoqwaTJYPubhVz3lO5O6MK553XVgUo33LdnNsQ=
github.com/vbauerster/mpb/v8 v8.8.3/go.mod h1:JfCCrtcMsJwP6ZwMn9e5LMnNyp3TVNpUWWkN+nd4EWk=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down Expand Up @@ -382,8 +388,6 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
Expand Down
103 changes: 89 additions & 14 deletions pkg/client/s3client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package client
import (
"errors"
"fmt"
"io"
"os"
"regexp"
"sync"
Expand All @@ -30,6 +31,9 @@ import (
"github.com/IBM/ibm-cos-sdk-go/service/s3"
"github.com/IBM/ibm-cos-sdk-go/service/s3/s3manager"
"github.com/IBM/platform-services-go-sdk/resourcecontrollerv2"
"github.com/vbauerster/mpb/v8"
"github.com/vbauerster/mpb/v8/decor"

"github.com/ppc64le-cloud/pvsadm/pkg"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
Expand Down Expand Up @@ -235,11 +239,37 @@ func (c *S3Client) CopyObjectToBucket(srcBucketName string, destBucketName strin
}

type CustomReader struct {
fp *os.File
size int64
read int64
signMap map[int64]struct{}
mux sync.Mutex
fp *os.File
size int64
read int64
mux sync.Mutex
progresstracker *ProgressTracker
signMap map[int64]struct{}
}

type ProgressTracker struct {
progress *mpb.Progress
bar *mpb.Bar
isBarSet bool
counter *formattedCounter
}

type formattedCounter struct {
read *int64
total int64
}

func (f *formattedCounter) Decor(stat decor.Statistics) (string, int) {
str := fmt.Sprintf("%s/%s", formatBytes(*f.read), formatBytes(f.total))
return str, len(str)
}

func (f *formattedCounter) Format(string) (string, int) {
return "", 0
}

func (f *formattedCounter) Sync() (chan int, bool) {
return nil, false
}

func (r *CustomReader) Read(p []byte) (int, error) {
Expand All @@ -249,28 +279,51 @@ func (r *CustomReader) Read(p []byte) (int, error) {
func (r *CustomReader) ReadAt(p []byte, off int64) (int, error) {
n, err := r.fp.ReadAt(p, off)
if err != nil {
if err == io.EOF {
return n, nil
}
return n, err
}
r.mux.Lock()
if _, ok := r.signMap[off]; ok {
r.read += int64(n)
progress := int(float32(r.read*100) / float32(r.size))
fmt.Printf("\rUploading: Total read(bytes):%d progress:%d%%", r.read, progress)
r.progresstracker.counter.read = &r.read
} else {
r.signMap[off] = struct{}{}
}
r.progresstracker.bar.SetCurrent(r.read)
r.mux.Unlock()
return n, nil
}

// Format the bytes to a human-readable string
func formatBytes(size int64) string {
const (
KB = 1024
MB = KB * 1024
GB = MB * 1024
)

switch {
case size >= GB:
return fmt.Sprintf("%.2f GB", float64(size)/GB)
case size >= MB:
return fmt.Sprintf("%.2f MB", float64(size)/MB)
case size >= KB:
return fmt.Sprintf("%.2f KB", float64(size)/KB)
default:
return fmt.Sprintf("%d Bytes", size)
}
}

func (r *CustomReader) Seek(offset int64, whence int) (int64, error) {
return r.fp.Seek(offset, whence)
}

// To upload a object to S3 bucket
func (c *S3Client) UploadObject(fileName, objectName, bucketName string) error {
klog.Infof("uploading the file %s", fileName)
//Read the content of the file
klog.Infof("Uploading the file %s", fileName)
// Read the content of the file
file, err := os.Open(fileName)
if err != nil {
return fmt.Errorf("err opening file %s, err: %s", fileName, err)
Expand All @@ -280,13 +333,33 @@ func (c *S3Client) UploadObject(fileName, objectName, bucketName string) error {
fileInfo, err := file.Stat()
if err != nil {
return fmt.Errorf("failed to stat file %v, err: %v", fileName, err)

}

// Create the custom reader for file reading and progress tracking
reader := &CustomReader{
fp: file,
size: fileInfo.Size(),
signMap: map[int64]struct{}{},
}

// Initialize progress tracker
progressTracker := &ProgressTracker{
progress: mpb.New(),
counter: &formattedCounter{read: new(int64), total: reader.size},
}

bar := progressTracker.progress.AddBar(reader.size,
mpb.PrependDecorators(
decor.Name("Uploading: ", decor.WC{W: 15}),
progressTracker.counter,
),
mpb.AppendDecorators(
decor.Percentage(),
),
)
reader.progresstracker = progressTracker
reader.progresstracker.bar = bar

// Create an uploader with S3 client
uploader := s3manager.NewUploaderWithClient(c.S3Session, func(u *s3manager.Uploader) {
u.PartSize = 64 * 1024 * 1024
Expand All @@ -299,13 +372,15 @@ func (c *S3Client) UploadObject(fileName, objectName, bucketName string) error {
Body: reader,
}

// Perform an upload.
// Perform an upload
startTime := time.Now()
result, err := uploader.Upload(upParams)
if err != nil {
return err
return fmt.Errorf("upload failed: %v", err)
}
fmt.Println()
klog.Infof("Upload completed successfully in %s seconds to location %s", time.Since(startTime).Round(time.Second), result.Location)

progressTracker.progress.Wait()

klog.Infof("Upload completed successfully in %s to location %s", time.Since(startTime).Round(time.Second), result.Location)
return nil
}

0 comments on commit 40d0bb1

Please sign in to comment.