Skip to content

Commit

Permalink
Implementing GC Using Version Vector (#981)
Browse files Browse the repository at this point in the history
Co-authored-by: Youngteac Hong <[email protected]>
  • Loading branch information
JOOHOJANG and hackerwins authored Sep 26, 2024
1 parent eefccb0 commit eec25b3
Show file tree
Hide file tree
Showing 27 changed files with 1,922 additions and 1,091 deletions.
13 changes: 13 additions & 0 deletions api/converter/from_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,25 @@ func FromChangePack(pbPack *api.ChangePack) (*change.Pack, error) {
return nil, err
}

versionVector, err := FromVersionVector(pbPack.VersionVector)
if err != nil {
return nil, err
}

pack := &change.Pack{
DocumentKey: key.Key(pbPack.DocumentKey),
Checkpoint: fromCheckpoint(pbPack.Checkpoint),
Changes: changes,
MinSyncedTicket: minSyncedTicket,
IsRemoved: pbPack.IsRemoved,
VersionVector: versionVector,
}

if pbPack.MinSyncedVersionVector != nil {
pack.MinSyncedVersionVector, err = FromVersionVector(pbPack.MinSyncedVersionVector)
if err != nil {
return nil, err
}
}

if pbPack.Snapshot != nil {
Expand Down
6 changes: 6 additions & 0 deletions api/converter/to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,18 @@ func ToChangePack(pack *change.Pack) (*api.ChangePack, error) {
return nil, err
}

pbVersionVector, err := ToVersionVector(pack.VersionVector)
if err != nil {
return nil, err
}

return &api.ChangePack{
DocumentKey: pack.DocumentKey.String(),
Checkpoint: ToCheckpoint(pack.Checkpoint),
Changes: pbChanges,
Snapshot: pack.Snapshot,
MinSyncedTicket: ToTimeTicket(pack.MinSyncedTicket),
VersionVector: pbVersionVector,
IsRemoved: pack.IsRemoved,
}, nil
}
Expand Down
8 changes: 7 additions & 1 deletion api/docs/yorkie/v1/resources.openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ components:
minSyncedTicket:
$ref: '#/components/schemas/yorkie.v1.TimeTicket'
additionalProperties: false
description: ""
description: Deprecated
title: min_synced_ticket
type: object
minSyncedVersionVector:
Expand All @@ -280,6 +280,12 @@ components:
description: ""
title: snapshot_version_vector
type: object
versionVector:
$ref: '#/components/schemas/yorkie.v1.VersionVector'
additionalProperties: false
description: ""
title: version_vector
type: object
title: ChangePack
type: object
yorkie.v1.Checkpoint:
Expand Down
8 changes: 7 additions & 1 deletion api/docs/yorkie/v1/yorkie.openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ components:
minSyncedTicket:
$ref: '#/components/schemas/yorkie.v1.TimeTicket'
additionalProperties: false
description: ""
description: Deprecated
title: min_synced_ticket
type: object
minSyncedVersionVector:
Expand All @@ -488,6 +488,12 @@ components:
description: ""
title: snapshot_version_vector
type: object
versionVector:
$ref: '#/components/schemas/yorkie.v1.VersionVector'
additionalProperties: false
description: ""
title: version_vector
type: object
title: ChangePack
type: object
yorkie.v1.Checkpoint:
Expand Down
1,569 changes: 791 additions & 778 deletions api/yorkie/v1/resources.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions api/yorkie/v1/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ message ChangePack {
TimeTicket min_synced_ticket = 5; // Deprecated
VersionVector min_synced_version_vector = 8;
bool is_removed = 6;
VersionVector version_vector = 9;
}

message Change {
Expand Down
2 changes: 1 addition & 1 deletion cmd/yorkie/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func Preload(_ *cobra.Command, _ []string) error {
}

if err := viper.ReadInConfig(); err != nil {
return fmt.Errorf("failed to read in config: %w", err)
return fmt.Errorf("read in config: %w", err)
}
return nil
}
24 changes: 13 additions & 11 deletions pkg/document/change/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ func (id ID) NewTimeTicket(delimiter uint32) *time.Ticket {
func (id ID) SyncClocks(other ID) ID {
lamport := id.lamport + 1
if id.lamport < other.lamport {
lamport = other.lamport
lamport = other.lamport + 1
}

newID := NewID(id.clientSeq, InitialServerSeq, lamport, id.actorID, id.versionVector)
newID.versionVector.Set(other.actorID, other.lamport)
newID := NewID(id.clientSeq, InitialServerSeq, lamport, id.actorID, id.versionVector.Max(other.versionVector))
newID.versionVector.Set(id.actorID, lamport)
return newID
}

Expand All @@ -114,16 +114,18 @@ func (id ID) SyncClocks(other ID) ID {
func (id ID) SetClocks(otherLamport int64, vector time.VersionVector) ID {
lamport := id.lamport + 1
if id.lamport < otherLamport {
lamport = otherLamport
lamport = otherLamport + 1
}

return NewID(
id.clientSeq,
id.serverSeq,
lamport,
id.actorID,
vector,
)
newID := NewID(id.clientSeq, id.serverSeq, lamport, id.actorID, id.versionVector.Max(vector))
newID.versionVector.Set(id.actorID, lamport)

return newID
}

// SetVersionVector sets version vector
func (id ID) SetVersionVector(vector time.VersionVector) ID {
return NewID(id.clientSeq, id.serverSeq, id.lamport, id.actorID, vector)
}

// SetActor sets actorID.
Expand Down
13 changes: 9 additions & 4 deletions pkg/document/change/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type Pack struct {
// Snapshot is a byte array that encode the document.
Snapshot []byte

// VersionVector is the version vector of the document.
VersionVector time.VersionVector

// SnapshotVersionVector is the version vector of the snapshot if it exists.
SnapshotVersionVector time.VersionVector

Expand All @@ -55,13 +58,15 @@ func NewPack(
key key.Key,
cp Checkpoint,
changes []*Change,
versionVector time.VersionVector,
snapshot []byte,
) *Pack {
return &Pack{
DocumentKey: key,
Checkpoint: cp,
Changes: changes,
Snapshot: snapshot,
DocumentKey: key,
Checkpoint: cp,
Changes: changes,
VersionVector: versionVector,
Snapshot: snapshot,
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/document/crdt/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (r *Root) GarbageCollect(vector time.VersionVector) (int, error) {
count := 0

for _, pair := range r.gcElementPairMap {
if vector.After(pair.elem.RemovedAt()) {
if vector.EqualToOrAfter(pair.elem.RemovedAt()) {
if err := pair.parent.Purge(pair.elem); err != nil {
return 0, err
}
Expand All @@ -157,7 +157,7 @@ func (r *Root) GarbageCollect(vector time.VersionVector) (int, error) {
}

for _, pair := range r.gcNodePairMap {
if vector.After(pair.Child.RemovedAt()) {
if vector.EqualToOrAfter(pair.Child.RemovedAt()) {
if err := pair.Parent.Purge(pair.Child); err != nil {
return 0, err
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/document/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,17 @@ func (d *Document) ApplyChangePack(pack *change.Pack) error {
d.GarbageCollect(pack.MinSyncedVersionVector)
}

// 05. Update the status.
// 05. Remove detached client's lamport from version vector if it exists
if pack.MinSyncedVersionVector != nil {
actorIDs, err := pack.MinSyncedVersionVector.Keys()
if err != nil {
return err
}

d.doc.changeID = d.doc.changeID.SetVersionVector(d.doc.changeID.VersionVector().Filter(actorIDs))
}

// 06. Update the status.
if pack.IsRemoved {
d.SetStatus(StatusRemoved)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/document/document_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,14 +558,14 @@ func TestDocument(t *testing.T) {
docB.SetActor(actorB)
assert.Equal(t, "{}", docB.VersionVector().Marshal())
assert.NoError(t, docB.ApplyChangePack(packA))
assert.Equal(t, "{000000000000000000000001:2}", docB.VersionVector().Marshal())
assert.Equal(t, "{000000000000000000000001:2,000000000000000000000002:3}", docB.VersionVector().Marshal())

assert.NoError(t, docB.Update(func(r *json.Object, p *presence.Presence) error {
r.SetString("k2", "3")
return nil
}))
assert.Equal(t, int64(2), docB.VersionVector().VersionOf(actorA))
assert.Equal(t, int64(3), docB.VersionVector().VersionOf(actorB))
assert.Equal(t, int64(4), docB.VersionVector().VersionOf(actorB))
packB := docB.CreateChangePack()
packB.MinSyncedTicket = time.InitialTicket
assert.True(t, packB.Changes[0].AfterOrEqual(packA.Changes[1]))
Expand Down
12 changes: 11 additions & 1 deletion pkg/document/internal_document.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,16 @@ func (d *InternalDocument) ApplyChangePack(pack *change.Pack, disableGC bool) er
}
}

// 04. Remove detached client's lamport from version vector if it exists
if pack.MinSyncedVersionVector != nil {
actorIDs, err := pack.MinSyncedVersionVector.Keys()
if err != nil {
return err
}

d.changeID = d.changeID.SetVersionVector(d.changeID.VersionVector().Filter(actorIDs))
}

return nil
}

Expand All @@ -196,7 +206,7 @@ func (d *InternalDocument) CreateChangePack() *change.Pack {
changes := d.localChanges

cp := d.checkpoint.IncreaseClientSeq(uint32(len(changes)))
return change.NewPack(d.key, cp, changes, nil)
return change.NewPack(d.key, cp, changes, d.VersionVector(), nil)
}

// SetActor sets actor into this document. This is also applied in the local
Expand Down
Loading

0 comments on commit eec25b3

Please sign in to comment.