Skip to content

Commit

Permalink
WIP - Simplify MerkleCRDT interface
Browse files Browse the repository at this point in the history
Composite CRDTs (doc and the future collection crdt) are never Saved via the existing interface, and by removing the Save function from it we remove indirection, loss of type safety, and some overly defensive code.
  • Loading branch information
AndrewSisley committed Oct 31, 2024
1 parent 3b7598c commit ed52932
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 63 deletions.
2 changes: 1 addition & 1 deletion internal/db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ func (c *collection) save(
return err
}

merkleCRDT, err := merklecrdt.InstanceWithStore(
merkleCRDT, err := merklecrdt.FieldLevelCRDTWithStore(
txn,
keys.NewCollectionSchemaVersionKey(c.Schema().VersionID, c.ID()),
val.Type(),
Expand Down
2 changes: 1 addition & 1 deletion internal/db/fetcher/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (vf *VersionedFetcher) processBlock(
) (err error) {
mcrdt, exists := vf.mCRDTs[field.ID]
if !exists {
mcrdt, err = merklecrdt.InstanceWithStore(
mcrdt, err = merklecrdt.FieldLevelCRDTWithStore(
vf.store,
keys.CollectionSchemaVersionKey{},
field.Typ,
Expand Down
2 changes: 1 addition & 1 deletion internal/db/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func (mp *mergeProcessor) initCRDTForType(field string) (merklecrdt.MerkleCRDT,
return nil, nil
}

mcrdt, err := merklecrdt.InstanceWithStore(
mcrdt, err := merklecrdt.FieldLevelCRDTWithStore(
mp.txn,
schemaVersionKey,
fd.Typ,
Expand Down
8 changes: 1 addition & 7 deletions internal/merkle/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,7 @@ func (m *MerkleCompositeDAG) Delete(
}

// Save the value of the composite CRDT to DAG.
func (m *MerkleCompositeDAG) Save(ctx context.Context, data any) (cidlink.Link, []byte, error) {
links, ok := data.([]coreblock.DAGLink)
if !ok {
return cidlink.Link{}, nil, NewErrUnexpectedValueType(client.COMPOSITE, []coreblock.DAGLink{}, data)
}

func (m *MerkleCompositeDAG) Save(ctx context.Context, links []coreblock.DAGLink) (cidlink.Link, []byte, error) {
delta := m.reg.Set(client.Active)

return m.clock.AddDelta(ctx, delta, links...)
}
10 changes: 3 additions & 7 deletions internal/merkle/crdt/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type MerkleCounter struct {
reg crdt.Counter
}

var _ MerkleCRDT = (*MerkleCounter)(nil)
var _ FieldLevelMerkleCRDT = (*MerkleCounter)(nil)

// NewMerkleCounter creates a new instance (or loaded from DB) of a MerkleCRDT
// backed by a Counter CRDT.
Expand All @@ -53,12 +53,8 @@ func (m *MerkleCounter) Clock() *clock.MerkleClock {
}

// Save the value of the Counter to the DAG.
func (m *MerkleCounter) Save(ctx context.Context, data any) (cidlink.Link, []byte, error) {
value, ok := data.(*DocField)
if !ok {
return cidlink.Link{}, nil, NewErrUnexpectedValueType(m.reg.CType(), &client.FieldValue{}, data)
}
bytes, err := value.FieldValue.Bytes()
func (m *MerkleCounter) Save(ctx context.Context, data *DocField) (cidlink.Link, []byte, error) {
bytes, err := data.FieldValue.Bytes()
if err != nil {
return cidlink.Link{}, nil, err
}
Expand Down
35 changes: 0 additions & 35 deletions internal/merkle/crdt/errors.go

This file was deleted.

11 changes: 3 additions & 8 deletions internal/merkle/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

cidlink "github.com/ipld/go-ipld-prime/linking/cid"

"github.com/sourcenetwork/defradb/client"
corecrdt "github.com/sourcenetwork/defradb/internal/core/crdt"
"github.com/sourcenetwork/defradb/internal/keys"
"github.com/sourcenetwork/defradb/internal/merkle/clock"
Expand All @@ -27,7 +26,7 @@ type MerkleLWWRegister struct {
reg corecrdt.LWWRegister
}

var _ MerkleCRDT = (*MerkleLWWRegister)(nil)
var _ FieldLevelMerkleCRDT = (*MerkleLWWRegister)(nil)

// NewMerkleLWWRegister creates a new instance (or loaded from DB) of a MerkleCRDT
// backed by a LWWRegister CRDT.
Expand All @@ -51,12 +50,8 @@ func (m *MerkleLWWRegister) Clock() *clock.MerkleClock {
}

// Save the value of the register to the DAG.
func (m *MerkleLWWRegister) Save(ctx context.Context, data any) (cidlink.Link, []byte, error) {
value, ok := data.(*DocField)
if !ok {
return cidlink.Link{}, nil, NewErrUnexpectedValueType(client.LWW_REGISTER, &client.FieldValue{}, data)
}
bytes, err := value.FieldValue.Bytes()
func (m *MerkleLWWRegister) Save(ctx context.Context, data *DocField) (cidlink.Link, []byte, error) {
bytes, err := data.FieldValue.Bytes()
if err != nil {
return cidlink.Link{}, nil, err
}
Expand Down
10 changes: 7 additions & 3 deletions internal/merkle/crdt/merklecrdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,21 @@ type Stores interface {
// so it can be merged with any given semantics.
type MerkleCRDT interface {
Clock() *clock.MerkleClock
Save(ctx context.Context, data any) (cidlink.Link, []byte, error)
}

func InstanceWithStore(
type FieldLevelMerkleCRDT interface {
MerkleCRDT
Save(ctx context.Context, data *DocField) (cidlink.Link, []byte, error)
}

func FieldLevelCRDTWithStore(
store Stores,
schemaVersionKey keys.CollectionSchemaVersionKey,
cType client.CType,
kind client.FieldKind,
key keys.DataStoreKey,
fieldName string,
) (MerkleCRDT, error) {
) (FieldLevelMerkleCRDT, error) {
switch cType {
case client.LWW_REGISTER:
return NewMerkleLWWRegister(
Expand Down

0 comments on commit ed52932

Please sign in to comment.