Skip to content

Commit

Permalink
try to reduce the diff snapshot size
Browse files Browse the repository at this point in the history
  • Loading branch information
Termina1 committed Nov 27, 2024
1 parent 2791c98 commit cff8ebc
Showing 1 changed file with 48 additions and 8 deletions.
56 changes: 48 additions & 8 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

const SyncBlockBits = 28
const SyncBlockMask = (rdx.ID(1) << SyncBlockBits) - 1
const MaxParcelSize = 5_000_000 // 5MB

type SyncHost interface {
protocol.Drainer
Expand Down Expand Up @@ -332,31 +333,67 @@ func (sync *Syncer) FeedHandshake() (vv protocol.Records, err error) {
return protocol.Records{hs}, nil
}

func (sync *Syncer) FeedBlockDiff() (diff protocol.Records, err error) {
if !sync.vvit.Next() {
return nil, io.EOF
}
func (sync *Syncer) getVVChanges() (hasChanges bool, sendvv rdx.VV, err error) {
vv := make(rdx.VV)
err = vv.PutTLV(sync.vvit.Value())
if err != nil {
return nil, rdx.ErrBadVRecord
return false, nil, rdx.ErrBadVRecord
}
sendvv := make(rdx.VV)
sendvv = make(rdx.VV)
// check for any changes
hasChanges := false // fixme up & repeat
hasChanges = false // fixme up & repeat
for src, pro := range vv {
peerpro, ok := sync.peervv[src]
if !ok || pro > peerpro {
sendvv[src] = peerpro
hasChanges = true
}
}
return
}

func (sync *Syncer) nextBlockDiff() (bool, rdx.VV, error) {
if sync.ffit != nil {
block := VKeyId(sync.vvit.Key()).ZeroOff()
till := block + SyncBlockMask + 1
if sync.ffit.Valid() {
id, _ := OKeyIdRdt(sync.ffit.Key())
if id != rdx.BadId && id < till {
_, sendvv, err := sync.getVVChanges()
if err != nil {
return false, nil, err
}
return true, sendvv, nil
}
}
}
if sync.vvit == nil || !sync.vvit.Next() {
return false, nil, io.EOF
}
hasChanges, sendvv, err := sync.getVVChanges()
if err != nil {
return false, nil, err
}
if !hasChanges {
return protocol.Records{}, nil
return false, nil, nil
}

block := VKeyId(sync.vvit.Key()).ZeroOff()
key := OKey(block, 0)
sync.ffit.SeekGE(key)
return true, sendvv, nil
}

func (sync *Syncer) FeedBlockDiff() (diff protocol.Records, err error) {
hasChanges, sendvv, cerr := sync.nextBlockDiff()
if cerr != nil {
return nil, cerr
}
if !hasChanges {
return protocol.Records{}, nil
}

block := VKeyId(sync.vvit.Key()).ZeroOff()
bmark, parcel := protocol.OpenHeader(nil, 'D')
parcel = append(parcel, protocol.Record('T', sync.snaplast.ZipBytes())...)
parcel = append(parcel, protocol.Record('R', block.ZipBytes())...)
Expand All @@ -366,6 +403,9 @@ func (sync *Syncer) FeedBlockDiff() (diff protocol.Records, err error) {
if id == rdx.BadId || id >= till {
break
}
if len(parcel) > MaxParcelSize {
break
}
lim, ok := sendvv[id.Src()]
if ok && (id.Pro() > lim || lim == 0) {
parcel = append(parcel, protocol.Record('F', rdx.ZipUint64(uint64(id-block)))...)
Expand Down

0 comments on commit cff8ebc

Please sign in to comment.