Skip to content

Commit

Permalink
Ignore and fix compression format errors in target. Performance impro…
Browse files Browse the repository at this point in the history
…vements and code cleanup.
  • Loading branch information
dop251 committed Oct 20, 2021
1 parent 9339e2d commit 60e87ce
Show file tree
Hide file tree
Showing 6 changed files with 850 additions and 125 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
on:
release:
types: [created]

name: Release
jobs:
releases-matrix:
name: Release Go Binary
Expand Down
154 changes: 93 additions & 61 deletions diskrsync/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,8 @@ type remoteProc struct {
cmd *exec.Cmd
}

type targetFile interface {
io.ReadWriteSeeker
io.Closer
}

func usage() {
fmt.Fprintf(os.Stderr, "Usage: %s [--ssh-flags=\"...\"] [--no-compress] [--verbose] <src> <dst>\nsrc and dst is [[user@]host:]path\n", os.Args[0])
_, _ = fmt.Fprintf(os.Stderr, "Usage: %s [--ssh-flags=\"...\"] [--no-compress] [--verbose] <src> <dst>\nsrc and dst is [[user@]host:]path\n", os.Args[0])
os.Exit(2)
}

Expand Down Expand Up @@ -118,41 +113,51 @@ func (p *localProc) Start(cmdReader io.Reader, cmdWriter io.WriteCloser, errChan
}

func (p *localProc) run(cmdReader io.Reader, cmdWriter io.WriteCloser, errChan chan error) {
var err error
if p.mode == modeSource {
errChan <- doSource(p.p, cmdReader, cmdWriter, p.opts)
err = doSource(p.p, cmdReader, cmdWriter, p.opts)
} else {
errChan <- doTarget(p.p, cmdReader, cmdWriter, p.opts)
err = doTarget(p.p, cmdReader, cmdWriter, p.opts)
}

cerr := cmdWriter.Close()
if err == nil {
err = cerr
}
errChan <- err
}

cmdWriter.Close()
func (p *remoteProc) pipeCopy(dst io.WriteCloser, src io.Reader) {
_, err := io.Copy(dst, src)
if err != nil {
log.Printf("pipe copy failed: %v", err)
}
err = dst.Close()
if err != nil {
log.Printf("close failed after pipe copy: %v", err)
}
}

func (p *remoteProc) Start(cmdReader io.Reader, cmdWriter io.WriteCloser, errChan chan error) error {
p.cmd.Stdout = cmdWriter
p.cmd.Stderr = os.Stderr
p.cmd.Stdin = cmdReader

w, err := p.cmd.StdinPipe()
r, err := p.cmd.StdoutPipe()
if err != nil {
return err
}

go func() {
io.Copy(w, cmdReader)
w.Close()
}()

err = p.cmd.Start()
if err != nil {
return err
}
go p.run(cmdWriter, errChan)
go p.run(cmdWriter, r, errChan)
return nil
}

func (p *remoteProc) run(writer io.Closer, errChan chan error) {
err := p.cmd.Wait()
writer.Close()
errChan <- err
func (p *remoteProc) run(w io.WriteCloser, r io.Reader, errChan chan error) {
p.pipeCopy(w, r)
errChan <- p.cmd.Wait()
}

func doSource(p string, cmdReader io.Reader, cmdWriter io.WriteCloser, opts *options) error {
Expand All @@ -176,83 +181,93 @@ func doSource(p string, cmdReader io.Reader, cmdWriter io.WriteCloser, opts *opt
src = sf
}

size, err := src.Seek(0, os.SEEK_END)
size, err := src.Seek(0, io.SeekEnd)
if err != nil {
return err
}

_, err = src.Seek(0, os.SEEK_SET)
_, err = src.Seek(0, io.SeekStart)
if err != nil {
return err
}

err = diskrsync.Source(src, size, cmdReader, cmdWriter, true, opts.verbose)
cmdWriter.Close()
cerr := cmdWriter.Close()
if err == nil {
err = cerr
}
return err
}

func doTarget(p string, cmdReader io.Reader, cmdWriter io.WriteCloser, opts *options) error {
var w targetFile
useBuffer := false
func doTarget(p string, cmdReader io.Reader, cmdWriter io.WriteCloser, opts *options) (err error) {
var w spgz.SparseFile
useReadBuffer := false

f, err := os.OpenFile(p, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return err
return
}

info, err := f.Stat()
if err != nil {
f.Close()
return err
_ = f.Close()
return
}

if info.Mode() & (os.ModeDevice | os.ModeCharDevice) != 0 {
w = f
useBuffer = true
if info.Mode()&(os.ModeDevice|os.ModeCharDevice) != 0 {
w = spgz.NewSparseFileWithoutHolePunching(f)
useReadBuffer = true
} else if !opts.noCompress {
sf, err := spgz.NewFromFileSize(f, os.O_RDWR|os.O_CREATE, diskrsync.DefTargetBlockSize)
if err != nil {
if err != spgz.ErrInvalidFormat {
if err == spgz.ErrPunchHoleNotSupported {
err = fmt.Errorf("target does not support compression. Try with -no-compress option (error was '%v')", err)
}
f.Close()
_ = f.Close()
return err
}
} else {
w = sf
w = &diskrsync.FixingSpgzFileWrapper{SpgzFile: sf}
}
}

if w == nil {
w = spgz.NewSparseWriter(spgz.NewSparseFileWithFallback(f))
useBuffer = true
w = spgz.NewSparseFileWithFallback(f)
useReadBuffer = true
}

defer w.Close()
defer func() {
cerr := w.Close()
if err == nil {
err = cerr
}
}()

size, err := w.Seek(0, os.SEEK_END)
size, err := w.Seek(0, io.SeekEnd)
if err != nil {
return err
}

_, err = w.Seek(0, os.SEEK_SET)
_, err = w.Seek(0, io.SeekStart)

if err != nil {
return err
}

err = diskrsync.Target(w, size, cmdReader, cmdWriter, useBuffer, opts.verbose)
cmdWriter.Close()
err = diskrsync.Target(w, size, cmdReader, cmdWriter, useReadBuffer, opts.verbose)
cerr := cmdWriter.Close()
if err == nil {
err = cerr
}

return err
return
}

func doCmd(opts *options) bool {
func doCmd(opts *options) (err error) {
src, err := createProc(flag.Arg(0), modeSource, opts)
if err != nil {
log.Printf("Could not create source: %v", err)
return false
return fmt.Errorf("could not create source: %w", err)
}

path := flag.Arg(1)
Expand All @@ -262,8 +277,7 @@ func doCmd(opts *options) bool {

dst, err := createProc(path, modeTarget, opts)
if err != nil {
log.Printf("Could not create target: %v", err)
return false
return fmt.Errorf("could not create target: %w", err)
}

srcErrChan := make(chan error, 1)
Expand All @@ -276,24 +290,42 @@ func doCmd(opts *options) bool {
sw := &diskrsync.CountingWriteCloser{WriteCloser: srcWriter}

if opts.verbose {
src.Start(sr, sw, srcErrChan)
err = src.Start(sr, sw, srcErrChan)
} else {
src.Start(srcReader, srcWriter, srcErrChan)
err = src.Start(srcReader, srcWriter, srcErrChan)
}

if err != nil {
return fmt.Errorf("could not start source: %w", err)
}

dst.Start(dstReader, dstWriter, dstErrChan)
dstErr := <-dstErrChan
if dstErr != nil {
log.Printf("Target error: %v", dstErr)
err = dst.Start(dstReader, dstWriter, dstErrChan)
if err != nil {
return fmt.Errorf("could not start target: %w", err)
}
srcErr := <-srcErrChan
if srcErr != nil {
log.Printf("Source error: %v", srcErr)

L:
for srcErrChan != nil || dstErrChan != nil {
select {
case dstErr := <-dstErrChan:
if dstErr != nil {
err = fmt.Errorf("target error: %w", dstErr)
break L
}
dstErrChan = nil
case srcErr := <-srcErrChan:
if srcErr != nil {
err = fmt.Errorf("source error: %w", srcErr)
break L
}
srcErrChan = nil
}
}

if opts.verbose {
log.Printf("Read: %d, wrote: %d\n", sr.Count(), sw.Count())
}
return srcErr == nil && dstErr == nil
return
}

func main() {
Expand Down Expand Up @@ -328,9 +360,9 @@ func main() {
if flag.Arg(0) == "" || flag.Arg(1) == "" {
usage()
}
ok := doCmd(&opts)
if !ok {
os.Exit(1)
err := doCmd(&opts)
if err != nil {
log.Fatal(err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ module github.com/dop251/diskrsync
go 1.16

require (
github.com/dop251/spgz v0.0.0-20180204132655-b86304a2b188
github.com/dop251/spgz v1.1.0
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519
)
12 changes: 10 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
github.com/dop251/spgz v0.0.0-20180204132655-b86304a2b188 h1:UYxfuh/hzW7AmbV9eAhr3mh8Qjr34z4fD5PlFbBaiUI=
github.com/dop251/spgz v0.0.0-20180204132655-b86304a2b188/go.mod h1:LNJUPCpuM80Fs0wTQ3+0oRp6h26KO5mAv4jOTLShJ8w=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dop251/buse v0.0.0-20170916130217-f7a5c857babd/go.mod h1:hQb8UeARubyuKpfzN+fkxK9TTytxdWRCalLjf/FQgwk=
github.com/dop251/nbd v0.0.0-20170916130042-b8933b281cb7/go.mod h1:/YqO/I24sucjxhCgQHgDrnffSwg5HzoYHQASayZnYl8=
github.com/dop251/spgz v1.1.0 h1:y49BXvoyhF+Y9No69DCJLqTCACleK27B73XWsXa2nFU=
github.com/dop251/spgz v1.1.0/go.mod h1:aXXbApWJzaK6jzPiIWWLFi3k47VmRFfunUp2ANdQFD8=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
Loading

0 comments on commit 60e87ce

Please sign in to comment.