Skip to content

Commit

Permalink
refactor(general): minor cleanups (kopia#4003)
Browse files Browse the repository at this point in the history
Followups to kopia#3655

* wrap fs.Reader
* nit: remove unnecessary intermediate variable
* nit: rename local variable
* cleanup: move restore.Progress interface to cli pkg
* move cliRestoreProgress to a separate file
* refactor(general): replace switch with if/else for clarity
  Removes a tautology for `err == nil`, which was guaranteed
  to be true in the second case statement for the switch.
  Replacing the switch statement with and if/else block is clearer.
* initialize restoreProgress in restore command
* fix: use error.Wrapf with format string and args


Simplify SetCounters signature:

Pass arguments in a `restore.Stats` struct.
  `SetCounters(s restore.Stats)`
Simplifies call sites and implementation.
In this case it makes sense to pass all the values
using the restore.Stats struct as it simplifies
the calls.
However, this pattern should be avoided in general
as it essentially makes all the arguments "optional".
This makes it easy to miss setting a value and simply
passing 0 (the default value), thus it becomes error
prone.
In this particular case, the struct is being passed
through verbatim, thus eliminating the risk of
missing a value, at least in the current state of
the code.
  • Loading branch information
julio-lopez authored Aug 27, 2024
1 parent d37de83 commit 5dbc8a4
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 199 deletions.
16 changes: 5 additions & 11 deletions cli/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/logging"
"github.com/kopia/kopia/repo/maintenance"
"github.com/kopia/kopia/snapshot/restore"
"github.com/kopia/kopia/snapshot/snapshotmaintenance"
)

Expand Down Expand Up @@ -87,7 +86,7 @@ type appServices interface {
advancedCommand(ctx context.Context)
repositoryConfigFileName() string
getProgress() *cliProgress
getRestoreProgress() restore.Progress
getRestoreProgress() RestoreProgress

stdout() io.Writer
Stderr() io.Writer
Expand Down Expand Up @@ -120,7 +119,7 @@ type App struct {
enableAutomaticMaintenance bool
pf profileFlags
progress *cliProgress
restoreProgress restore.Progress
restoreProgress RestoreProgress
initialUpdateCheckDelay time.Duration
updateCheckInterval time.Duration
updateAvailableNotifyInterval time.Duration
Expand Down Expand Up @@ -186,11 +185,11 @@ func (c *App) getProgress() *cliProgress {
}

// SetRestoreProgress is used to set custom restore progress, purposed to be used in tests.
func (c *App) SetRestoreProgress(p restore.Progress) {
func (c *App) SetRestoreProgress(p RestoreProgress) {
c.restoreProgress = p
}

func (c *App) getRestoreProgress() restore.Progress {
func (c *App) getRestoreProgress() RestoreProgress {
return c.restoreProgress
}

Expand Down Expand Up @@ -293,10 +292,6 @@ func (c *App) setup(app *kingpin.Application) {
c.pf.setup(app)
c.progress.setup(c, app)

if rp, ok := c.restoreProgress.(*cliRestoreProgress); ok {
rp.setup(c, app)
}

c.blob.setup(c, app)
c.benchmark.setup(c, app)
c.cache.setup(c, app)
Expand Down Expand Up @@ -325,8 +320,7 @@ type commandParent interface {
// NewApp creates a new instance of App.
func NewApp() *App {
return &App{
progress: &cliProgress{},
restoreProgress: &cliRestoreProgress{},
progress: &cliProgress{},
cliStorageProviders: []StorageProvider{
{"from-config", "the provided configuration file", func() StorageFlags { return &storageFromConfigFlags{} }},

Expand Down
120 changes: 0 additions & 120 deletions cli/cli_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,124 +259,4 @@ func (p *cliProgress) Finish() {
}
}

type cliRestoreProgress struct {
restoredCount atomic.Int32
enqueuedCount atomic.Int32
skippedCount atomic.Int32
ignoredErrorsCount atomic.Int32

restoredTotalFileSize atomic.Int64
enqueuedTotalFileSize atomic.Int64
skippedTotalFileSize atomic.Int64

progressUpdateInterval time.Duration
enableProgress bool

svc appServices
outputThrottle timetrack.Throttle
outputMutex sync.Mutex
out textOutput
eta timetrack.Estimator

// +checklocks:outputMutex
lastLineLength int
}

func (p *cliRestoreProgress) setup(svc appServices, _ *kingpin.Application) {
cp := svc.getProgress()
if cp == nil {
return
}

p.progressUpdateInterval = cp.progressUpdateInterval
p.enableProgress = cp.enableProgress
p.out = cp.out
p.svc = svc

p.eta = timetrack.Start()
}

func (p *cliRestoreProgress) SetCounters(
enqueuedCount, restoredCount, skippedCount, ignoredErrors int32,
enqueuedBytes, restoredBytes, skippedBytes int64,
) {
p.enqueuedCount.Store(enqueuedCount)
p.enqueuedTotalFileSize.Store(enqueuedBytes)

p.restoredCount.Store(restoredCount)
p.restoredTotalFileSize.Store(restoredBytes)

p.skippedCount.Store(skippedCount)
p.skippedTotalFileSize.Store(skippedBytes)

p.ignoredErrorsCount.Store(ignoredErrors)

p.maybeOutput()
}

func (p *cliRestoreProgress) Flush() {
p.outputThrottle.Reset()
p.output("\n")
}

func (p *cliRestoreProgress) maybeOutput() {
if p.outputThrottle.ShouldOutput(p.svc.getProgress().progressUpdateInterval) {
p.output("")
}
}

func (p *cliRestoreProgress) output(suffix string) {
if !p.svc.getProgress().enableProgress {
return
}

p.outputMutex.Lock()
defer p.outputMutex.Unlock()

restoredCount := p.restoredCount.Load()
enqueuedCount := p.enqueuedCount.Load()
skippedCount := p.skippedCount.Load()
ignoredCount := p.ignoredErrorsCount.Load()

restoredSize := p.restoredTotalFileSize.Load()
enqueuedSize := p.enqueuedTotalFileSize.Load()
skippedSize := p.skippedTotalFileSize.Load()

if restoredSize == 0 {
return
}

var maybeRemaining, maybeSkipped, maybeErrors string
if est, ok := p.eta.Estimate(float64(restoredSize), float64(enqueuedSize)); ok {
maybeRemaining = fmt.Sprintf(" %v (%.1f%%) remaining %v",
units.BytesPerSecondsString(est.SpeedPerSecond),
est.PercentComplete,
est.Remaining)
}

if skippedCount > 0 {
maybeSkipped = fmt.Sprintf(", skipped %v (%v)", skippedCount, units.BytesString(skippedSize))
}

if ignoredCount > 0 {
maybeErrors = fmt.Sprintf(", ignored %v errors", ignoredCount)
}

line := fmt.Sprintf("Processed %v (%v) of %v (%v)%v%v%v.",
restoredCount+skippedCount, units.BytesString(restoredSize),
enqueuedCount, units.BytesString(enqueuedSize),
maybeSkipped, maybeErrors, maybeRemaining,
)

var extraSpaces string

if len(line) < p.lastLineLength {
// add extra spaces to wipe over previous line if it was longer than current
extraSpaces = strings.Repeat(" ", p.lastLineLength-len(line))
}

p.lastLineLength = len(line)
p.out.printStderr("\r%v%v%v", line, extraSpaces, suffix)
}

var _ snapshotfs.UploadProgress = (*cliProgress)(nil)
34 changes: 24 additions & 10 deletions cli/command_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/localfs"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/timetrack"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/object"
Expand Down Expand Up @@ -95,6 +96,12 @@ followed by the path of the directory for the contents to be restored.
unlimitedDepth = math.MaxInt32
)

// RestoreProgress is invoked to report progress during a restore.
type RestoreProgress interface {
SetCounters(s restore.Stats)
Flush()
}

type restoreSourceTarget struct {
source string
target string
Expand Down Expand Up @@ -366,6 +373,21 @@ func (c *commandRestore) setupPlaceholderExpansion(ctx context.Context, rep repo
return rootEntry, nil
}

func (c *commandRestore) getRestoreProgress() RestoreProgress {
if rp := c.svc.getRestoreProgress(); rp != nil {
return rp
}

pf := c.svc.getProgress().progressFlags

return &cliRestoreProgress{
enableProgress: pf.enableProgress,
out: pf.out,
progressUpdateInterval: pf.progressUpdateInterval,
eta: timetrack.Start(),
}
}

func (c *commandRestore) run(ctx context.Context, rep repo.Repository) error {
output, oerr := c.restoreOutput(ctx, rep)
if oerr != nil {
Expand Down Expand Up @@ -396,17 +418,9 @@ func (c *commandRestore) run(ctx context.Context, rep repo.Repository) error {
rootEntry = re
}

restoreProgress := c.svc.getRestoreProgress()
restoreProgress := c.getRestoreProgress()
progressCallback := func(ctx context.Context, stats restore.Stats) {
restoreProgress.SetCounters(
stats.EnqueuedFileCount+stats.EnqueuedDirCount+stats.EnqueuedSymlinkCount,
stats.RestoredFileCount+stats.RestoredDirCount+stats.RestoredSymlinkCount,
stats.SkippedCount,
stats.IgnoredErrorCount,
stats.EnqueuedTotalFileSize,
stats.RestoredTotalFileSize,
stats.SkippedTotalFileSize,
)
restoreProgress.SetCounters(stats)
}

st, err := restore.Entry(ctx, rep, output, rootEntry, restore.Options{
Expand Down
116 changes: 116 additions & 0 deletions cli/restore_progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package cli

import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/kopia/kopia/internal/timetrack"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/snapshot/restore"
)

type cliRestoreProgress struct {
restoredCount atomic.Int32
enqueuedCount atomic.Int32
skippedCount atomic.Int32
ignoredErrorsCount atomic.Int32

restoredTotalFileSize atomic.Int64
enqueuedTotalFileSize atomic.Int64
skippedTotalFileSize atomic.Int64

progressUpdateInterval time.Duration
enableProgress bool

outputThrottle timetrack.Throttle
outputMutex sync.Mutex
out textOutput // +checklocksignore: outputMutex just happens to be held always.
eta timetrack.Estimator // +checklocksignore: outputMutex just happens to be held always.

// +checklocks:outputMutex
lastLineLength int
}

func (p *cliRestoreProgress) SetCounters(s restore.Stats) {
p.enqueuedCount.Store(s.EnqueuedFileCount + s.EnqueuedDirCount + s.EnqueuedSymlinkCount)
p.enqueuedTotalFileSize.Store(s.EnqueuedTotalFileSize)

p.restoredCount.Store(s.RestoredFileCount + s.RestoredDirCount + s.RestoredSymlinkCount)
p.restoredTotalFileSize.Store(s.RestoredTotalFileSize)

p.skippedCount.Store(s.SkippedCount)
p.skippedTotalFileSize.Store(s.SkippedTotalFileSize)

p.ignoredErrorsCount.Store(s.IgnoredErrorCount)

p.maybeOutput()
}

func (p *cliRestoreProgress) Flush() {
p.outputThrottle.Reset()
p.output("\n")
}

func (p *cliRestoreProgress) maybeOutput() {
if p.outputThrottle.ShouldOutput(p.progressUpdateInterval) {
p.output("")
}
}

func (p *cliRestoreProgress) output(suffix string) {
if !p.enableProgress {
return
}

// ensure the counters are not going back in an output line compared to the previous one
p.outputMutex.Lock()
defer p.outputMutex.Unlock()

restoredCount := p.restoredCount.Load()
enqueuedCount := p.enqueuedCount.Load()
skippedCount := p.skippedCount.Load()
ignoredCount := p.ignoredErrorsCount.Load()

restoredSize := p.restoredTotalFileSize.Load()
enqueuedSize := p.enqueuedTotalFileSize.Load()
skippedSize := p.skippedTotalFileSize.Load()

if restoredSize == 0 {
return
}

var maybeRemaining, maybeSkipped, maybeErrors string
if est, ok := p.eta.Estimate(float64(restoredSize), float64(enqueuedSize)); ok {
maybeRemaining = fmt.Sprintf(" %v (%.1f%%) remaining %v",
units.BytesPerSecondsString(est.SpeedPerSecond),
est.PercentComplete,
est.Remaining)
}

if skippedCount > 0 {
maybeSkipped = fmt.Sprintf(", skipped %v (%v)", skippedCount, units.BytesString(skippedSize))
}

if ignoredCount > 0 {
maybeErrors = fmt.Sprintf(", ignored %v errors", ignoredCount)
}

line := fmt.Sprintf("Processed %v (%v) of %v (%v)%v%v%v.",
restoredCount+skippedCount, units.BytesString(restoredSize),
enqueuedCount, units.BytesString(enqueuedSize),
maybeSkipped, maybeErrors, maybeRemaining,
)

var extraSpaces string

if len(line) < p.lastLineLength {
// add extra spaces to wipe over previous line if it was longer than current
extraSpaces = strings.Repeat(" ", p.lastLineLength-len(line))
}

p.lastLineLength = len(line)
p.out.printStderr("\r%v%v%v", line, extraSpaces, suffix)
}
Loading

0 comments on commit 5dbc8a4

Please sign in to comment.