Skip to content

Commit

Permalink
Handle local changes correctly when receiving snapshot (#923)
Browse files Browse the repository at this point in the history
This commit addresses the issue where unsent local changes were
missing from `Document.Root` when receiving a snapshot from the server.

To resolve this problem, the local changes are now applied to
`Document.Root` after applying the snapshot.

---------

Co-authored-by: Youngteac Hong <[email protected]>
  • Loading branch information
raararaara and hackerwins authored Jul 11, 2024
1 parent 1d96ea0 commit e2f7869
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 19 deletions.
8 changes: 5 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,11 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...
c.attachments[doc.Key()].watchCtx = watchCtx
c.attachments[doc.Key()].closeWatchStream = cancelFunc

err = c.runWatchLoop(watchCtx, doc)
if err != nil {
return err
if !opts.IsManual {
err = c.runWatchLoop(watchCtx, doc)
if err != nil {
return err
}
}

return nil
Expand Down
6 changes: 6 additions & 0 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,19 @@ type AttachOption func(*AttachOptions)
type AttachOptions struct {
// Presence is the presence of the client.
Presence innerpresence.Presence
IsManual bool
}

// WithPresence configures the presence of the client.
func WithPresence(presence innerpresence.Presence) AttachOption {
return func(o *AttachOptions) { o.Presence = presence }
}

// WithManualSync configures the manual sync of the client.
func WithManualSync() AttachOption {
return func(o *AttachOptions) { o.IsManual = true }
}

// DetachOption configures DetachOptions.
type DetachOption func(*DetachOptions)

Expand Down
45 changes: 29 additions & 16 deletions pkg/document/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,24 +189,9 @@ func (d *Document) ApplyChangePack(pack *change.Pack) error {
return err
}
} else {
if err := d.ensureClone(); err != nil {
if err := d.applyChanges(pack.Changes); err != nil {
return err
}

for _, c := range pack.Changes {
if err := c.Execute(d.cloneRoot, d.clonePresences); err != nil {
return err
}
}

events, err := d.doc.ApplyChanges(pack.Changes...)
if err != nil {
return err
}

for _, e := range events {
d.events <- e
}
}

// 02. Remove local changes applied to server.
Expand All @@ -218,6 +203,12 @@ func (d *Document) ApplyChangePack(pack *change.Pack) error {
d.doc.localChanges = d.doc.localChanges[1:]
}

if len(pack.Snapshot) > 0 {
if err := d.applyChanges(d.doc.localChanges); err != nil {
return err
}
}

// 03. Update the checkpoint.
d.doc.checkpoint = d.doc.checkpoint.Forward(pack.Checkpoint)

Expand All @@ -234,6 +225,28 @@ func (d *Document) ApplyChangePack(pack *change.Pack) error {
return nil
}

func (d *Document) applyChanges(changes []*change.Change) error {
if err := d.ensureClone(); err != nil {
return err
}

for _, c := range changes {
if err := c.Execute(d.cloneRoot, d.clonePresences); err != nil {
return err
}
}

events, err := d.doc.ApplyChanges(changes...)
if err != nil {
return err
}

for _, e := range events {
d.events <- e
}
return nil
}

// InternalDocument returns the internal document.
func (d *Document) InternalDocument() *InternalDocument {
return d.doc
Expand Down
32 changes: 32 additions & 0 deletions pkg/document/document_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (

"github.com/stretchr/testify/assert"

"github.com/yorkie-team/yorkie/api/converter"
"github.com/yorkie-team/yorkie/pkg/document"
"github.com/yorkie-team/yorkie/pkg/document/change"
"github.com/yorkie-team/yorkie/pkg/document/crdt"
"github.com/yorkie-team/yorkie/pkg/document/json"
"github.com/yorkie-team/yorkie/pkg/document/presence"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/test/helper"
)

var (
Expand Down Expand Up @@ -549,4 +551,34 @@ func TestDocument(t *testing.T) {
doc.GarbageCollect(time.MaxTicket)
assert.Equal(t, 2, doc.Root().GetText("k1").TreeByID().Len())
})

t.Run("handle local changes correctly when receiving snapshot test", func(t *testing.T) {
// 01. Create a document and a counter.
doc := document.New("d1")
assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewCounter("c", crdt.IntegerCnt, 0)
return nil
}))

// 02. Increase the counter until the snapshot threshold and create a snapshot.
for i := 0; i < int(helper.SnapshotThreshold); i++ {
assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error {
root.GetCounter("c").Increase(1)
return nil
}))
}
snapshot, _ := converter.SnapshotToBytes(doc.RootObject(), doc.AllPresences())
pack := change.NewPack(doc.Key(), doc.CreateChangePack().Checkpoint, nil, snapshot)

// 03. Make a local change before applying changePack.
assert.NoError(t, doc.Update(func(root *json.Object, p *presence.Presence) error {
root.GetCounter("c").Increase(1)
return nil
}))
expectedCount := doc.Root().GetCounter("c").Value()

// 04. Apply the changePack and check if the counter value is correct.
assert.NoError(t, doc.ApplyChangePack(pack))
assert.Equal(t, doc.Root().GetCounter("c").Value(), expectedCount)
})
}

0 comments on commit e2f7869

Please sign in to comment.