diff --git a/cli/app.go b/cli/app.go index 755c34a31e0..cec3e79f198 100644 --- a/cli/app.go +++ b/cli/app.go @@ -23,7 +23,6 @@ import ( "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/logging" "github.com/kopia/kopia/repo/maintenance" - "github.com/kopia/kopia/snapshot/restore" "github.com/kopia/kopia/snapshot/snapshotmaintenance" ) @@ -87,7 +86,7 @@ type appServices interface { advancedCommand(ctx context.Context) repositoryConfigFileName() string getProgress() *cliProgress - getRestoreProgress() restore.Progress + getRestoreProgress() RestoreProgress stdout() io.Writer Stderr() io.Writer @@ -120,7 +119,7 @@ type App struct { enableAutomaticMaintenance bool pf profileFlags progress *cliProgress - restoreProgress restore.Progress + restoreProgress RestoreProgress initialUpdateCheckDelay time.Duration updateCheckInterval time.Duration updateAvailableNotifyInterval time.Duration @@ -186,11 +185,11 @@ func (c *App) getProgress() *cliProgress { } // SetRestoreProgress is used to set custom restore progress, purposed to be used in tests. -func (c *App) SetRestoreProgress(p restore.Progress) { +func (c *App) SetRestoreProgress(p RestoreProgress) { c.restoreProgress = p } -func (c *App) getRestoreProgress() restore.Progress { +func (c *App) getRestoreProgress() RestoreProgress { return c.restoreProgress } @@ -293,10 +292,6 @@ func (c *App) setup(app *kingpin.Application) { c.pf.setup(app) c.progress.setup(c, app) - if rp, ok := c.restoreProgress.(*cliRestoreProgress); ok { - rp.setup(c, app) - } - c.blob.setup(c, app) c.benchmark.setup(c, app) c.cache.setup(c, app) @@ -325,8 +320,7 @@ type commandParent interface { // NewApp creates a new instance of App. func NewApp() *App { return &App{ - progress: &cliProgress{}, - restoreProgress: &cliRestoreProgress{}, + progress: &cliProgress{}, cliStorageProviders: []StorageProvider{ {"from-config", "the provided configuration file", func() StorageFlags { return &storageFromConfigFlags{} }}, diff --git a/cli/cli_progress.go b/cli/cli_progress.go index ec4236624a4..fa5fda5d3df 100644 --- a/cli/cli_progress.go +++ b/cli/cli_progress.go @@ -259,124 +259,4 @@ func (p *cliProgress) Finish() { } } -type cliRestoreProgress struct { - restoredCount atomic.Int32 - enqueuedCount atomic.Int32 - skippedCount atomic.Int32 - ignoredErrorsCount atomic.Int32 - - restoredTotalFileSize atomic.Int64 - enqueuedTotalFileSize atomic.Int64 - skippedTotalFileSize atomic.Int64 - - progressUpdateInterval time.Duration - enableProgress bool - - svc appServices - outputThrottle timetrack.Throttle - outputMutex sync.Mutex - out textOutput - eta timetrack.Estimator - - // +checklocks:outputMutex - lastLineLength int -} - -func (p *cliRestoreProgress) setup(svc appServices, _ *kingpin.Application) { - cp := svc.getProgress() - if cp == nil { - return - } - - p.progressUpdateInterval = cp.progressUpdateInterval - p.enableProgress = cp.enableProgress - p.out = cp.out - p.svc = svc - - p.eta = timetrack.Start() -} - -func (p *cliRestoreProgress) SetCounters( - enqueuedCount, restoredCount, skippedCount, ignoredErrors int32, - enqueuedBytes, restoredBytes, skippedBytes int64, -) { - p.enqueuedCount.Store(enqueuedCount) - p.enqueuedTotalFileSize.Store(enqueuedBytes) - - p.restoredCount.Store(restoredCount) - p.restoredTotalFileSize.Store(restoredBytes) - - p.skippedCount.Store(skippedCount) - p.skippedTotalFileSize.Store(skippedBytes) - - p.ignoredErrorsCount.Store(ignoredErrors) - - p.maybeOutput() -} - -func (p *cliRestoreProgress) Flush() { - p.outputThrottle.Reset() - p.output("\n") -} - -func (p *cliRestoreProgress) maybeOutput() { - if p.outputThrottle.ShouldOutput(p.svc.getProgress().progressUpdateInterval) { - p.output("") - } -} - -func (p *cliRestoreProgress) output(suffix string) { - if !p.svc.getProgress().enableProgress { - return - } - - p.outputMutex.Lock() - defer p.outputMutex.Unlock() - - restoredCount := p.restoredCount.Load() - enqueuedCount := p.enqueuedCount.Load() - skippedCount := p.skippedCount.Load() - ignoredCount := p.ignoredErrorsCount.Load() - - restoredSize := p.restoredTotalFileSize.Load() - enqueuedSize := p.enqueuedTotalFileSize.Load() - skippedSize := p.skippedTotalFileSize.Load() - - if restoredSize == 0 { - return - } - - var maybeRemaining, maybeSkipped, maybeErrors string - if est, ok := p.eta.Estimate(float64(restoredSize), float64(enqueuedSize)); ok { - maybeRemaining = fmt.Sprintf(" %v (%.1f%%) remaining %v", - units.BytesPerSecondsString(est.SpeedPerSecond), - est.PercentComplete, - est.Remaining) - } - - if skippedCount > 0 { - maybeSkipped = fmt.Sprintf(", skipped %v (%v)", skippedCount, units.BytesString(skippedSize)) - } - - if ignoredCount > 0 { - maybeErrors = fmt.Sprintf(", ignored %v errors", ignoredCount) - } - - line := fmt.Sprintf("Processed %v (%v) of %v (%v)%v%v%v.", - restoredCount+skippedCount, units.BytesString(restoredSize), - enqueuedCount, units.BytesString(enqueuedSize), - maybeSkipped, maybeErrors, maybeRemaining, - ) - - var extraSpaces string - - if len(line) < p.lastLineLength { - // add extra spaces to wipe over previous line if it was longer than current - extraSpaces = strings.Repeat(" ", p.lastLineLength-len(line)) - } - - p.lastLineLength = len(line) - p.out.printStderr("\r%v%v%v", line, extraSpaces, suffix) -} - var _ snapshotfs.UploadProgress = (*cliProgress)(nil) diff --git a/cli/command_restore.go b/cli/command_restore.go index 198f2ea4336..4fe24e624a8 100644 --- a/cli/command_restore.go +++ b/cli/command_restore.go @@ -18,6 +18,7 @@ import ( "github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs/localfs" "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/internal/timetrack" "github.com/kopia/kopia/internal/units" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/object" @@ -95,6 +96,12 @@ followed by the path of the directory for the contents to be restored. unlimitedDepth = math.MaxInt32 ) +// RestoreProgress is invoked to report progress during a restore. +type RestoreProgress interface { + SetCounters(s restore.Stats) + Flush() +} + type restoreSourceTarget struct { source string target string @@ -366,6 +373,21 @@ func (c *commandRestore) setupPlaceholderExpansion(ctx context.Context, rep repo return rootEntry, nil } +func (c *commandRestore) getRestoreProgress() RestoreProgress { + if rp := c.svc.getRestoreProgress(); rp != nil { + return rp + } + + pf := c.svc.getProgress().progressFlags + + return &cliRestoreProgress{ + enableProgress: pf.enableProgress, + out: pf.out, + progressUpdateInterval: pf.progressUpdateInterval, + eta: timetrack.Start(), + } +} + func (c *commandRestore) run(ctx context.Context, rep repo.Repository) error { output, oerr := c.restoreOutput(ctx, rep) if oerr != nil { @@ -396,17 +418,9 @@ func (c *commandRestore) run(ctx context.Context, rep repo.Repository) error { rootEntry = re } - restoreProgress := c.svc.getRestoreProgress() + restoreProgress := c.getRestoreProgress() progressCallback := func(ctx context.Context, stats restore.Stats) { - restoreProgress.SetCounters( - stats.EnqueuedFileCount+stats.EnqueuedDirCount+stats.EnqueuedSymlinkCount, - stats.RestoredFileCount+stats.RestoredDirCount+stats.RestoredSymlinkCount, - stats.SkippedCount, - stats.IgnoredErrorCount, - stats.EnqueuedTotalFileSize, - stats.RestoredTotalFileSize, - stats.SkippedTotalFileSize, - ) + restoreProgress.SetCounters(stats) } st, err := restore.Entry(ctx, rep, output, rootEntry, restore.Options{ diff --git a/cli/restore_progress.go b/cli/restore_progress.go new file mode 100644 index 00000000000..80d5a621518 --- /dev/null +++ b/cli/restore_progress.go @@ -0,0 +1,116 @@ +package cli + +import ( + "fmt" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/kopia/kopia/internal/timetrack" + "github.com/kopia/kopia/internal/units" + "github.com/kopia/kopia/snapshot/restore" +) + +type cliRestoreProgress struct { + restoredCount atomic.Int32 + enqueuedCount atomic.Int32 + skippedCount atomic.Int32 + ignoredErrorsCount atomic.Int32 + + restoredTotalFileSize atomic.Int64 + enqueuedTotalFileSize atomic.Int64 + skippedTotalFileSize atomic.Int64 + + progressUpdateInterval time.Duration + enableProgress bool + + outputThrottle timetrack.Throttle + outputMutex sync.Mutex + out textOutput // +checklocksignore: outputMutex just happens to be held always. + eta timetrack.Estimator // +checklocksignore: outputMutex just happens to be held always. + + // +checklocks:outputMutex + lastLineLength int +} + +func (p *cliRestoreProgress) SetCounters(s restore.Stats) { + p.enqueuedCount.Store(s.EnqueuedFileCount + s.EnqueuedDirCount + s.EnqueuedSymlinkCount) + p.enqueuedTotalFileSize.Store(s.EnqueuedTotalFileSize) + + p.restoredCount.Store(s.RestoredFileCount + s.RestoredDirCount + s.RestoredSymlinkCount) + p.restoredTotalFileSize.Store(s.RestoredTotalFileSize) + + p.skippedCount.Store(s.SkippedCount) + p.skippedTotalFileSize.Store(s.SkippedTotalFileSize) + + p.ignoredErrorsCount.Store(s.IgnoredErrorCount) + + p.maybeOutput() +} + +func (p *cliRestoreProgress) Flush() { + p.outputThrottle.Reset() + p.output("\n") +} + +func (p *cliRestoreProgress) maybeOutput() { + if p.outputThrottle.ShouldOutput(p.progressUpdateInterval) { + p.output("") + } +} + +func (p *cliRestoreProgress) output(suffix string) { + if !p.enableProgress { + return + } + + // ensure the counters are not going back in an output line compared to the previous one + p.outputMutex.Lock() + defer p.outputMutex.Unlock() + + restoredCount := p.restoredCount.Load() + enqueuedCount := p.enqueuedCount.Load() + skippedCount := p.skippedCount.Load() + ignoredCount := p.ignoredErrorsCount.Load() + + restoredSize := p.restoredTotalFileSize.Load() + enqueuedSize := p.enqueuedTotalFileSize.Load() + skippedSize := p.skippedTotalFileSize.Load() + + if restoredSize == 0 { + return + } + + var maybeRemaining, maybeSkipped, maybeErrors string + if est, ok := p.eta.Estimate(float64(restoredSize), float64(enqueuedSize)); ok { + maybeRemaining = fmt.Sprintf(" %v (%.1f%%) remaining %v", + units.BytesPerSecondsString(est.SpeedPerSecond), + est.PercentComplete, + est.Remaining) + } + + if skippedCount > 0 { + maybeSkipped = fmt.Sprintf(", skipped %v (%v)", skippedCount, units.BytesString(skippedSize)) + } + + if ignoredCount > 0 { + maybeErrors = fmt.Sprintf(", ignored %v errors", ignoredCount) + } + + line := fmt.Sprintf("Processed %v (%v) of %v (%v)%v%v%v.", + restoredCount+skippedCount, units.BytesString(restoredSize), + enqueuedCount, units.BytesString(enqueuedSize), + maybeSkipped, maybeErrors, maybeRemaining, + ) + + var extraSpaces string + + if len(line) < p.lastLineLength { + // add extra spaces to wipe over previous line if it was longer than current + extraSpaces = strings.Repeat(" ", p.lastLineLength-len(line)) + } + + p.lastLineLength = len(line) + p.out.printStderr("\r%v%v%v", line, extraSpaces, suffix) +} diff --git a/snapshot/restore/local_fs_output.go b/snapshot/restore/local_fs_output.go index a2155b8e1a3..b55bab29c03 100644 --- a/snapshot/restore/local_fs_output.go +++ b/snapshot/restore/local_fs_output.go @@ -54,27 +54,16 @@ func getStreamCopier(ctx context.Context, targetpath string, sparse bool) (strea }, nil } -// progressReportingReader is just a wrapper for fs.Reader which is used to capture and pass to cb number of bytes read. +// progressReportingReader wraps fs.Reader Read function to capture the and pass +// the number of bytes read to the callback cb. type progressReportingReader struct { - r fs.Reader + fs.Reader cb FileWriteProgress } -func (r *progressReportingReader) Entry() (fs.Entry, error) { - return r.r.Entry() //nolint:wrapcheck -} - -func (r *progressReportingReader) Seek(offset int64, whence int) (int64, error) { - return r.r.Seek(offset, whence) //nolint:wrapcheck -} - -func (r *progressReportingReader) Close() error { - return r.r.Close() //nolint:wrapcheck -} - func (r *progressReportingReader) Read(p []byte) (int, error) { - bytesRead, err := r.r.Read(p) + bytesRead, err := r.Reader.Read(p) if err == nil && r.cb != nil { r.cb(int64(bytesRead)) } @@ -399,10 +388,8 @@ func write(targetPath string, r fs.Reader, size int64, c streamCopier) error { // close below, as close is idempotent. defer f.Close() //nolint:errcheck - name := f.Name() - if _, err := c(f, r); err != nil { - return errors.Wrap(err, "cannot write data to file %q "+name) + return errors.Wrapf(err, "cannot write data to file %q", f.Name()) } if err := f.Close(); err != nil { @@ -431,9 +418,9 @@ func (o *FilesystemOutput) copyFileContent(ctx context.Context, targetPath strin } defer r.Close() //nolint:errcheck - wr := &progressReportingReader{ - r: r, - cb: progressCb, + rr := &progressReportingReader{ + Reader: r, + cb: progressCb, } log(ctx).Debugf("copying file contents to: %v", targetPath) @@ -441,10 +428,10 @@ func (o *FilesystemOutput) copyFileContent(ctx context.Context, targetPath strin if o.WriteFilesAtomically { //nolint:wrapcheck - return atomicfile.Write(targetPath, wr) + return atomicfile.Write(targetPath, rr) } - return write(targetPath, wr, f.Size(), o.copier) + return write(targetPath, rr, f.Size(), o.copier) } func isEmptyDirectory(name string) (bool, error) { diff --git a/snapshot/restore/restore_progress.go b/snapshot/restore/restore_progress.go deleted file mode 100644 index fad030bbe27..00000000000 --- a/snapshot/restore/restore_progress.go +++ /dev/null @@ -1,10 +0,0 @@ -package restore - -// Progress is invoked by copier to report status of snapshot restoration. -type Progress interface { - SetCounters( - enqueuedCount, restoredCount, skippedCount, ignoredErrors int32, - enqueuedBytes, restoredBytes, skippedBytes int64, - ) - Flush() -} diff --git a/snapshot/snapshotfs/upload.go b/snapshot/snapshotfs/upload.go index 225a30f93f9..6670e1e3e29 100644 --- a/snapshot/snapshotfs/upload.go +++ b/snapshot/snapshotfs/upload.go @@ -145,10 +145,9 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis defer u.Progress.FinishedHashingFile(relativePath, f.Size()) if pf, ok := f.(snapshot.HasDirEntryOrNil); ok { - switch de, err := pf.DirEntryOrNil(ctx); { - case err != nil: + if de, err := pf.DirEntryOrNil(ctx); err != nil { return nil, errors.Wrap(err, "can't read placeholder") - case err == nil && de != nil: + } else if de != nil { // We have read sufficient information from the shallow file's extended // attribute to construct DirEntry. _, err := u.repo.VerifyObject(ctx, de.ObjectID) @@ -1073,10 +1072,9 @@ type dirReadError struct { func uploadShallowDirInternal(ctx context.Context, directory fs.Directory, u *Uploader) (*snapshot.DirEntry, error) { if pf, ok := directory.(snapshot.HasDirEntryOrNil); ok { - switch de, err := pf.DirEntryOrNil(ctx); { - case err != nil: + if de, err := pf.DirEntryOrNil(ctx); err != nil { return nil, errors.Wrapf(err, "error reading placeholder for %q", directory.Name()) - case err == nil && de != nil: + } else if de != nil { if _, err := u.repo.VerifyObject(ctx, de.ObjectID); err != nil { return nil, errors.Wrapf(err, "invalid placeholder for %q contains foreign object.ID", directory.Name()) } diff --git a/tests/end_to_end_test/restore_test.go b/tests/end_to_end_test/restore_test.go index 04f196233d0..db29ee6315d 100644 --- a/tests/end_to_end_test/restore_test.go +++ b/tests/end_to_end_test/restore_test.go @@ -29,6 +29,7 @@ import ( "github.com/kopia/kopia/internal/stat" "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/internal/testutil" + "github.com/kopia/kopia/snapshot/restore" "github.com/kopia/kopia/tests/clitestutil" "github.com/kopia/kopia/tests/testdirtree" "github.com/kopia/kopia/tests/testenv" @@ -42,33 +43,19 @@ const ( overriddenDirPermissions = 0o752 ) -type restoreProgressInvocation struct { - enqueuedCount, restoredCount, skippedCount, ignoredErrors int32 - enqueuedBytes, restoredBytes, skippedBytes int64 -} - type fakeRestoreProgress struct { mtx sync.Mutex - invocations []restoreProgressInvocation + invocations []restore.Stats flushesCount int invocationAfterFlush bool } -func (p *fakeRestoreProgress) SetCounters( - enqueuedCount, restoredCount, skippedCount, ignoredErrors int32, - enqueuedBytes, restoredBytes, skippedBytes int64, -) { +func (p *fakeRestoreProgress) SetCounters(s restore.Stats) { p.mtx.Lock() defer p.mtx.Unlock() - p.invocations = append(p.invocations, restoreProgressInvocation{ - enqueuedCount: enqueuedCount, - restoredCount: restoredCount, - skippedCount: skippedCount, - ignoredErrors: ignoredErrors, - enqueuedBytes: enqueuedBytes, - restoredBytes: restoredBytes, - skippedBytes: skippedBytes, - }) + + p.invocations = append(p.invocations, s) + if p.flushesCount > 0 { p.invocationAfterFlush = true }