Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce VersionVector to detect the relationship between changes #800

Closed
wants to merge 11 commits into from
6 changes: 6 additions & 0 deletions admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,16 @@ func (c *Client) ListChangeSummaries(
return nil, err
}

vector, err := converter.FromVersionVector(snapshotMeta.Msg.VersionVector)
if err != nil {
return nil, err
}

newDoc, err := document.NewInternalDocumentFromSnapshot(
key,
seq,
snapshotMeta.Msg.Lamport,
vector,
snapshotMeta.Msg.Snapshot,
)

Expand Down
42 changes: 39 additions & 3 deletions api/converter/from_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,23 @@ func FromChangePack(pbPack *api.ChangePack) (*change.Pack, error) {
return nil, err
}

return &change.Pack{
pack := &change.Pack{
DocumentKey: key.Key(pbPack.DocumentKey),
Checkpoint: fromCheckpoint(pbPack.Checkpoint),
Changes: changes,
Snapshot: pbPack.Snapshot,
MinSyncedTicket: minSyncedTicket,
IsRemoved: pbPack.IsRemoved,
}, nil
}

if pbPack.Snapshot != nil {
pack.Snapshot = pbPack.Snapshot
pack.SnapshotVersionVector, err = FromVersionVector(pbPack.SnapshotVersionVector)
if err != nil {
return nil, err
}
}

return pack, nil
}

func fromCheckpoint(pbCheckpoint *api.Checkpoint) change.Checkpoint {
Expand Down Expand Up @@ -147,14 +156,41 @@ func fromChangeID(id *api.ChangeID) (change.ID, error) {
if err != nil {
return change.InitialID, err
}

vector, err := FromVersionVector(id.VersionVector)
if err != nil {
return change.InitialID, err
}

return change.NewID(
id.ClientSeq,
id.ServerSeq,
id.Lamport,
actorID,
vector,
), nil
}

// FromVersionVector converts the given Protobuf formats to model format.
func FromVersionVector(pbVersionVector *api.VersionVector) (time.VersionVector, error) {
versionVector := make(time.VersionVector)
// TODO(hackerwins): Old clients until v0.4.15 don't send VersionVector.
// Remove this check after all clients are updated to the v0.4.16 or later.
if pbVersionVector == nil {
return versionVector, nil
}

for id, lamport := range pbVersionVector.Vector {
actorID, err := time.ActorIDFromHex(id)
if err != nil {
return nil, err
}
versionVector.Set(actorID, lamport)
}

return versionVector, nil
}

// FromDocumentID converts the given Protobuf formats to model format.
func FromDocumentID(pbID string) (types.ID, error) {
id := types.ID(pbID)
Expand Down
39 changes: 33 additions & 6 deletions api/converter/to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,35 @@ func ToCheckpoint(cp change.Checkpoint) *api.Checkpoint {
}

// ToChangeID converts the given model format to Protobuf format.
func ToChangeID(id change.ID) *api.ChangeID {
func ToChangeID(id change.ID) (*api.ChangeID, error) {
pbVersionVector, err := ToVersionVector(id.VersionVector())
if err != nil {
return nil, err
}
return &api.ChangeID{
ClientSeq: id.ClientSeq(),
ServerSeq: id.ServerSeq(),
Lamport: id.Lamport(),
ActorId: id.ActorID().Bytes(),
ClientSeq: id.ClientSeq(),
ServerSeq: id.ServerSeq(),
Lamport: id.Lamport(),
ActorId: id.ActorID().Bytes(),
VersionVector: pbVersionVector,
}, nil
}

// ToVersionVector converts the given model format to Protobuf format.
func ToVersionVector(vector time.VersionVector) (*api.VersionVector, error) {
pbVersionVector := make(map[string]int64)
for actor, clock := range vector {
id, err := time.ActorIDFromBytes(actor[:])
if err != nil {
return nil, err
}

pbVersionVector[id.String()] = clock
}

return &api.VersionVector{
Vector: pbVersionVector,
}, nil
}

// ToDocEventType converts the given model format to Protobuf format.
Expand Down Expand Up @@ -241,8 +263,13 @@ func ToChanges(changes []*change.Change) ([]*api.Change, error) {
return nil, err
}

pbChangeID, err := ToChangeID(c.ID())
if err != nil {
return nil, err
}

pbChanges = append(pbChanges, &api.Change{
Id: ToChangeID(c.ID()),
Id: pbChangeID,
Message: c.Message(),
Operations: pbOperations,
PresenceChange: ToPresenceChange(c.PresenceChange()),
Expand Down
Loading
Loading