Skip to content

Commit

Permalink
Fixed "unexpected EOF" error when the source is expanded and the last…
Browse files Browse the repository at this point in the history
… block is zeros
  • Loading branch information
dop251 committed Sep 30, 2021
1 parent ba78d23 commit f6b7a51
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 29 deletions.
39 changes: 24 additions & 15 deletions diskrsync/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ func createRemoteProc(host, path string, mode int, opts *options) (proc, error)
m += " --no-compress"
}
}
if opts.verbose {
m += " --verbose"
}

args := make([]string, 1, 8)
args[0] = "ssh"
Expand Down Expand Up @@ -245,10 +248,11 @@ func doTarget(p string, cmdReader io.Reader, cmdWriter io.WriteCloser, opts *opt
return err
}

func doCmd(opts *options) error {
func doCmd(opts *options) bool {
src, err := createProc(flag.Arg(0), modeSource, opts)
if err != nil {
return err
log.Printf("Could not create source: %v", err)
return false
}

path := flag.Arg(1)
Expand All @@ -258,10 +262,12 @@ func doCmd(opts *options) error {

dst, err := createProc(path, modeTarget, opts)
if err != nil {
return err
log.Printf("Could not create target: %v", err)
return false
}

errChan := make(chan error, 1)
srcErrChan := make(chan error, 1)
dstErrChan := make(chan error, 1)

srcReader, dstWriter := io.Pipe()
dstReader, srcWriter := io.Pipe()
Expand All @@ -270,21 +276,24 @@ func doCmd(opts *options) error {
sw := &diskrsync.CountingWriteCloser{WriteCloser: srcWriter}

if opts.verbose {
src.Start(sr, sw, errChan)
src.Start(sr, sw, srcErrChan)
} else {
src.Start(srcReader, srcWriter, errChan)
src.Start(srcReader, srcWriter, srcErrChan)
}

dst.Start(dstReader, dstWriter, errChan)
err = <-errChan
if err != nil {
return err
dst.Start(dstReader, dstWriter, dstErrChan)
dstErr := <-dstErrChan
if dstErr != nil {
log.Printf("Target error: %v", dstErr)
}
srcErr := <-srcErrChan
if srcErr != nil {
log.Printf("Source error: %v", srcErr)
}
err = <-errChan
if opts.verbose {
log.Printf("Read: %d, wrote: %d\n", sr.Count(), sw.Count())
}
return err
return srcErr == nil && dstErr == nil
}

func main() {
Expand Down Expand Up @@ -319,9 +328,9 @@ func main() {
if flag.Arg(0) == "" || flag.Arg(1) == "" {
usage()
}
err := doCmd(&opts)
if err != nil {
log.Fatalf(err.Error())
ok := doCmd(&opts)
if !ok {
os.Exit(1)
}
}

Expand Down
37 changes: 25 additions & 12 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,10 @@ func (t *tree) calc(verbose bool) error {

b := wi.buf[:n.size]
r, err := io.ReadFull(reader, b)
rr += int64(r)
if err != nil {
return err
return fmt.Errorf("in calc at %d (expected %d, read %d): %w", rr, len(b), r, err)
}
rr += int64(r)

wi.n = n

Expand Down Expand Up @@ -341,7 +341,7 @@ func (t *tree) calc(verbose bool) error {
workItems[workIdx].hashReady <- struct{}{}

if rr < t.size {
return fmt.Errorf("Read less data (%d) than expected (%d)", rr, t.size)
return fmt.Errorf("read less data (%d) than expected (%d)", rr, t.size)
}

return nil
Expand Down Expand Up @@ -382,7 +382,7 @@ func Source(reader io.ReadSeeker, size int64, cmdReader io.Reader, cmdWriter io.
var remoteSize int64
remoteSize, err = readHeader(cmdReader)
if err != nil {
return
return fmt.Errorf("could not read header: %w", err)
}

var commonSize int64
Expand Down Expand Up @@ -420,6 +420,9 @@ func Source(reader io.ReadSeeker, size int64, cmdReader io.Reader, cmdWriter io.

if size > commonSize {
// Write the tail
if verbose {
log.Print("Writing tail...")
}
_, err = reader.Seek(commonSize, io.SeekStart)
if err != nil {
return
Expand All @@ -439,10 +442,11 @@ func Source(reader io.ReadSeeker, size int64, cmdReader io.Reader, cmdWriter io.
break
}
if err != io.ErrUnexpectedEOF {
return
return fmt.Errorf("source, reading tail: %w", err)
}
buf = buf[:r]
stop = true
err = nil
}
if spgz.IsBlockZero(buf) {
if holeStart == -1 {
Expand Down Expand Up @@ -502,7 +506,7 @@ func (s *source) subtree(root *node, offset, size int64) (err error) {

_, err = io.ReadFull(s.cmdReader, remoteHash)
if err != nil {
return
return fmt.Errorf("source/subtree, reading hash: %w", err)
}

if bytes.Equal(root.sum, remoteHash) {
Expand All @@ -525,7 +529,7 @@ func (s *source) subtree(root *node, offset, size int64) (err error) {
buf := s.buffer(size)
_, err = io.ReadFull(s.reader, buf)
if err != nil {
return
return fmt.Errorf("source read failed at %d: %w", offset, err)
}

if spgz.IsBlockZero(buf) {
Expand Down Expand Up @@ -576,6 +580,10 @@ func Target(writer io.ReadWriteSeeker, size int64, cmdReader io.Reader, cmdWrite
commonSize = remoteSize
}

if verbose {
log.Printf("Local size: %d, remote size: %d", size, remoteSize)
}

if commonSize > 0 {
t := target{
base: base{
Expand Down Expand Up @@ -603,7 +611,9 @@ func Target(writer io.ReadWriteSeeker, size int64, cmdReader io.Reader, cmdWrite

if size < remoteSize {
// Read the tail
// log.Printf("Reading tail (%d bytes)...\n", remoteSize-size)
if verbose {
log.Printf("Reading tail (%d bytes)...", remoteSize-size)
}
_, err = writer.Seek(commonSize, io.SeekStart)
if err != nil {
return
Expand All @@ -620,7 +630,7 @@ func Target(writer io.ReadWriteSeeker, size int64, cmdReader io.Reader, cmdWrite
err = nil
break
}
return
return fmt.Errorf("target: while reading tail block header: %w", err)
}

if cmd == cmdBlock {
Expand All @@ -632,15 +642,15 @@ func Target(writer io.ReadWriteSeeker, size int64, cmdReader io.Reader, cmdWrite
err = nil
break
} else {
return
return fmt.Errorf("target: while copying block: %w", err)
}
}
} else {
if cmd == cmdHole {
var holeSize int64
err = binary.Read(rd, binary.LittleEndian, &holeSize)
if err != nil {
return
return fmt.Errorf("target: while reading hole size: %w", err)
}
_, err = writer.Seek(holeSize, io.SeekCurrent)
if err != nil {
Expand Down Expand Up @@ -678,7 +688,7 @@ func (t *target) subtree(root *node, offset, size int64) (err error) {
var cmd byte
err = binary.Read(t.cmdReader, binary.LittleEndian, &cmd)
if err != nil {
return
return fmt.Errorf("target: while reading block header at %d: %w", offset, err)
}

// log.Printf("offset: %d, size: %d, cmd: %d\n", offset, size, cmd)
Expand All @@ -692,6 +702,9 @@ func (t *target) subtree(root *node, offset, size int64) (err error) {

if cmd == cmdNotEqual {
_, err = io.CopyN(t.writer, t.cmdReader, size)
if err != nil {
err = fmt.Errorf("while copying block data at %d: %w", offset, err)
}
} else {
buf := t.buffer(size)
for i := int64(0); i < size; i++ {
Expand Down
26 changes: 24 additions & 2 deletions sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,20 @@ func TestExpand(t *testing.T) {
syncAndCheckEqual(src, dst, t)
}

func TestExpandWithZeros(t *testing.T) {
dst := make([]byte, 2*1024*1024)

for i := 0; i < len(dst); i++ {
dst[i] = byte(rand.Int31n(256))
}

src := make([]byte, 2*1024*1024+333333)

copy(src, dst)

syncAndCheckEqual(src, dst, t)
}

func TestShrink(t *testing.T) {
dst := make([]byte, 2*1024*1024+333333)

Expand Down Expand Up @@ -191,12 +205,16 @@ func syncAndCheckEqual(src, dst []byte, t *testing.T) (sent, received int64) {
dstReaderC := &CountingReader{Reader: dstReader}
dstWriterC := &CountingWriteCloser{WriteCloser: dstWriter}

srcErrChan := make(chan error, 1)

go func() {
err := Source(srcR, int64(len(src)), srcReader, srcWriter, false, false)
srcWriter.Close()
if err != nil {
t.Fatal(err)
srcErrChan <- err
return
}
srcWriter.Close()
srcErrChan <- nil
}()

err := Target(dstW, int64(len(dst)), dstReaderC, dstWriterC, false, false)
Expand All @@ -206,6 +224,10 @@ func syncAndCheckEqual(src, dst []byte, t *testing.T) (sent, received int64) {
t.Fatal(err)
}

if err = <-srcErrChan; err != nil {
t.Fatal(err)
}

if !bytes.Equal(srcR.Bytes(), dstW.Bytes()) {
t.Fatal("Not equal")
}
Expand Down

0 comments on commit f6b7a51

Please sign in to comment.