Skip to content

Commit

Permalink
enhance progress bar
Browse files Browse the repository at this point in the history
  • Loading branch information
priyanshikhetwani committed Nov 19, 2024
1 parent 410c6d9 commit 0759f25
Showing 1 changed file with 52 additions and 55 deletions.
107 changes: 52 additions & 55 deletions pkg/client/s3client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
"github.com/IBM/ibm-cos-sdk-go/service/s3/s3manager"
"github.com/IBM/platform-services-go-sdk/resourcecontrollerv2"
"github.com/ppc64le-cloud/pvsadm/pkg"
"github.com/vbauerster/mpb"
"github.com/vbauerster/mpb/decor"
"github.com/vbauerster/mpb/v8"
"github.com/vbauerster/mpb/v8/decor"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
)
Expand Down Expand Up @@ -238,11 +238,12 @@ func (c *S3Client) CopyObjectToBucket(srcBucketName string, destBucketName strin
}

type CustomReader struct {
fp *os.File
size int64
read int64
signMap map[int64]struct{}
mux sync.Mutex
fp *os.File
size int64
read int64
signMap map[int64]struct{}
mux sync.Mutex
progresstracker ProgressTracker
}

type ProgressTracker struct {
Expand All @@ -257,13 +258,9 @@ type formattedCounter struct {
total int64
}

// Syncable implements decor.Decorator.
func (f *formattedCounter) Syncable() (bool, chan int) {
return false, nil
}

func (f *formattedCounter) Decor(stat *decor.Statistics) string {
return fmt.Sprintf("%s / %s", formatBytes(*f.read), formatBytes(f.total))
func (f *formattedCounter) Decor(stat decor.Statistics) (str string, viewWidth int) {
str = fmt.Sprintf("%s / %s", formatBytes(*f.read), formatBytes(f.total))
return str, len(str)
}

func (f *formattedCounter) Format(string) (_ string, width int) {
Expand All @@ -274,24 +271,40 @@ func (f *formattedCounter) Sync() (chan int, bool) {
return nil, false
}

func (r *CustomReader) Read(p []byte) (int, error) {
func (r *CustomReader) Read(p []byte) (n int, err error) {
// Read data from the file
n, err = r.fp.Read(p)
if err == nil {
// Update the read progress
r.read += int64(n)
if r.read > r.size {
r.read = r.size
}
r.progresstracker.bar.SetCurrent(r.read)
}
return r.fp.Read(p)
}

func (r *CustomReader) ReadAt(p []byte, off int64, tracker *ProgressTracker) (int, error) {
r.mux.Lock()
klog.Infof("readat")
defer r.mux.Unlock()
n, err := r.fp.ReadAt(p, off)
if err != nil {
if err == io.EOF {
return n, nil
}
return n, err
}
r.mux.Lock()
r.read += int64(n)
r.progresstracker.bar.SetCurrent(r.read) // Update the progress bar
if _, ok := r.signMap[off]; ok {
r.read += int64(n)
progress := int(float32(r.read*100) / float32(r.size))
fmt.Printf("\rUploading: Total read(bytes):%d progress:%d%%", r.read, progress)
// Update read progress
progress := int(float32(r.read*100) / float32(r.size)) // Calculate progress percentage
klog.Infof("\rUploading: Total read(bytes): %d progress: %d%%", r.read, progress)
} else {
r.signMap[off] = struct{}{}
}
r.mux.Unlock()
return n, nil
}

Expand Down Expand Up @@ -321,77 +334,61 @@ func (r *CustomReader) Seek(offset int64, whence int) (int64, error) {

// To upload a object to S3 bucket
func (c *S3Client) UploadObject(fileName, objectName, bucketName string) error {
klog.Infof("uploading the file %s", fileName)
//Read the content of the file
klog.Infof("Uploading the file %s", fileName)

file, err := os.Open(fileName)
if err != nil {
return fmt.Errorf("err opening file %s, err: %s", fileName, err)
return fmt.Errorf("error opening file %s: %v", fileName, err)
}
defer file.Close()

fileInfo, err := file.Stat()
if err != nil {
return fmt.Errorf("failed to stat file %v, err: %v", fileName, err)

return fmt.Errorf("failed to stat file %v: %v", fileName, err)
}

// Create the custom reader for file reading and progress tracking
reader := &CustomReader{
fp: file,
size: fileInfo.Size(),
signMap: map[int64]struct{}{},
}
progressTracker := &ProgressTracker{
progress: mpb.New(),
counter: &formattedCounter{read: new(int64), total: reader.size},
signMap: make(map[int64]struct{}),
}

buffer := make([]byte, 1024)
var off int64
progress := mpb.New()

progressTracker.bar = progressTracker.progress.AddBar(reader.size,
bar := progress.AddBar(reader.size,
mpb.PrependDecorators(
decor.Name("Uploading: ", decor.WC{W: 15}),
progressTracker.counter,
),
mpb.AppendDecorators(
decor.Percentage(),
),
)

for {
bytesRead, err := reader.ReadAt(buffer, off, progressTracker)
if err != nil {
if err == io.EOF {
break
}
klog.Errorf("An error occurred while reading file: %v", err)
break
}
if bytesRead < 1024 {
break
}
off += int64(bytesRead)
}
// Set the progress bar on the reader
reader.progresstracker.bar = bar

// klog.Infof("Operation completed successfully. Total time: %s", time.Since(start).Round(time.Second))
// Create an uploader with S3 client
// Create an S3 uploader with custom part size
uploader := s3manager.NewUploaderWithClient(c.S3Session, func(u *s3manager.Uploader) {
u.PartSize = 64 * 1024 * 1024
u.PartSize = 64 * 1024 * 1024 // 64MB part size
})

// Upload input parameters
upParams := &s3manager.UploadInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectName),
Body: reader,
Body: reader, // Using custom reader to read the file
}

// Perform an upload.
// Perform the upload
startTime := time.Now()
result, err := uploader.Upload(upParams)
if err != nil {
return err
return fmt.Errorf("upload failed: %v", err)
}
fmt.Println()

progress.Wait()

klog.Infof("Upload completed successfully in %s seconds to location %s", time.Since(startTime).Round(time.Second), result.Location)
return nil
}

0 comments on commit 0759f25

Please sign in to comment.