Skip to content

Commit

Permalink
WIP - Remove baseMerkleCRDT
Browse files Browse the repository at this point in the history
Was causing so much indirection and seriously inhibited the readability of the code by losing the known type in the Merge function.  Now we can at least go from merkle.lwwr.Merge to core.lwwr.Merge without chasing down all implementations of the interface and wondering if they can all form part of a merkle.lwwr...
  • Loading branch information
AndrewSisley committed Oct 28, 2024
1 parent 750f782 commit df64796
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 95 deletions.
17 changes: 13 additions & 4 deletions internal/merkle/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import (

// MerkleCompositeDAG is a MerkleCRDT implementation of the CompositeDAG using MerkleClocks.
type MerkleCompositeDAG struct {
*baseMerkleCRDT
clock MerkleClock
// core.ReplicatedData
reg corecrdt.CompositeDAG
}

var _ MerkleCRDT = (*MerkleCompositeDAG)(nil)

// NewMerkleCompositeDAG creates a new instance (or loaded from DB) of a MerkleCRDT
// backed by a CompositeDAG CRDT.
func NewMerkleCompositeDAG(
Expand All @@ -44,14 +46,21 @@ func NewMerkleCompositeDAG(

clock := clock.NewMerkleClock(store.Headstore(), store.Blockstore(), store.Encstore(),
key.ToHeadStoreKey(), compositeDag)
base := &baseMerkleCRDT{clock: clock, crdt: compositeDag}

return &MerkleCompositeDAG{
baseMerkleCRDT: base,
reg: compositeDag,
clock: clock,
reg: compositeDag,
}
}

func (m *MerkleCompositeDAG) Clock() MerkleClock {
return m.clock
}

func (m *MerkleCompositeDAG) Merge(ctx context.Context, other core.Delta) error {
return m.reg.Merge(ctx, other)
}

// Delete sets the values of CompositeDAG for a delete.
func (m *MerkleCompositeDAG) Delete(
ctx context.Context,
Expand Down
29 changes: 19 additions & 10 deletions internal/merkle/crdt/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import (

// MerkleCounter is a MerkleCRDT implementation of the Counter using MerkleClocks.
type MerkleCounter struct {
*baseMerkleCRDT

reg crdt.Counter
clock MerkleClock
reg crdt.Counter
}

var _ MerkleCRDT = (*MerkleCounter)(nil)

// NewMerkleCounter creates a new instance (or loaded from DB) of a MerkleCRDT
// backed by a Counter CRDT.
func NewMerkleCounter(
Expand All @@ -40,26 +41,34 @@ func NewMerkleCounter(
) *MerkleCounter {
register := crdt.NewCounter(store.Datastore(), schemaVersionKey, key, fieldName, allowDecrement, kind)
clk := clock.NewMerkleClock(store.Headstore(), store.Blockstore(), store.Encstore(), key.ToHeadStoreKey(), register)
base := &baseMerkleCRDT{clock: clk, crdt: register}

return &MerkleCounter{
baseMerkleCRDT: base,
reg: register,
clock: clk,
reg: register,
}
}

func (m *MerkleCounter) Clock() MerkleClock {
return m.clock
}

func (m *MerkleCounter) Merge(ctx context.Context, other core.Delta) error {
return m.reg.Merge(ctx, other)
}

// Save the value of the Counter to the DAG.
func (mc *MerkleCounter) Save(ctx context.Context, data any) (cidlink.Link, []byte, error) {
func (m *MerkleCounter) Save(ctx context.Context, data any) (cidlink.Link, []byte, error) {
value, ok := data.(*DocField)
if !ok {
return cidlink.Link{}, nil, NewErrUnexpectedValueType(mc.reg.CType(), &client.FieldValue{}, data)
return cidlink.Link{}, nil, NewErrUnexpectedValueType(m.reg.CType(), &client.FieldValue{}, data)
}
bytes, err := value.FieldValue.Bytes()
if err != nil {
return cidlink.Link{}, nil, err
}
delta, err := mc.reg.Increment(ctx, bytes)
delta, err := m.reg.Increment(ctx, bytes)
if err != nil {
return cidlink.Link{}, nil, err
}
return mc.clock.AddDelta(ctx, delta)
return m.clock.AddDelta(ctx, delta)
}
27 changes: 18 additions & 9 deletions internal/merkle/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import (

// MerkleLWWRegister is a MerkleCRDT implementation of the LWWRegister using MerkleClocks.
type MerkleLWWRegister struct {
*baseMerkleCRDT

reg corecrdt.LWWRegister
clock MerkleClock
reg corecrdt.LWWRegister
}

var _ MerkleCRDT = (*MerkleLWWRegister)(nil)

// NewMerkleLWWRegister creates a new instance (or loaded from DB) of a MerkleCRDT
// backed by a LWWRegister CRDT.
func NewMerkleLWWRegister(
Expand All @@ -38,15 +39,23 @@ func NewMerkleLWWRegister(
) *MerkleLWWRegister {
register := corecrdt.NewLWWRegister(store.Datastore(), schemaVersionKey, key, fieldName)
clk := clock.NewMerkleClock(store.Headstore(), store.Blockstore(), store.Encstore(), key.ToHeadStoreKey(), register)
base := &baseMerkleCRDT{clock: clk, crdt: register}

return &MerkleLWWRegister{
baseMerkleCRDT: base,
reg: register,
clock: clk,
reg: register,
}
}

func (m *MerkleLWWRegister) Clock() MerkleClock {
return m.clock
}

func (m *MerkleLWWRegister) Merge(ctx context.Context, other core.Delta) error {
return m.reg.Merge(ctx, other)
}

// Save the value of the register to the DAG.
func (mlwwreg *MerkleLWWRegister) Save(ctx context.Context, data any) (cidlink.Link, []byte, error) {
func (m *MerkleLWWRegister) Save(ctx context.Context, data any) (cidlink.Link, []byte, error) {
value, ok := data.(*DocField)
if !ok {
return cidlink.Link{}, nil, NewErrUnexpectedValueType(client.LWW_REGISTER, &client.FieldValue{}, data)
Expand All @@ -58,6 +67,6 @@ func (mlwwreg *MerkleLWWRegister) Save(ctx context.Context, data any) (cidlink.L

// Set() call on underlying LWWRegister CRDT
// persist/publish delta
delta := mlwwreg.reg.Set(bytes)
return mlwwreg.clock.AddDelta(ctx, delta)
delta := m.reg.Set(bytes)
return m.clock.AddDelta(ctx, delta)
}
17 changes: 0 additions & 17 deletions internal/merkle/crdt/merklecrdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,23 +52,6 @@ type MerkleClock interface {
ProcessBlock(ctx context.Context, block *coreblock.Block, cid cidlink.Link) error
}

// baseMerkleCRDT handles the MerkleCRDT overhead functions that aren't CRDT specific like the mutations and state
// retrieval functions. It handles creating and publishing the CRDT DAG with the help of the MerkleClock.
type baseMerkleCRDT struct {
clock MerkleClock
crdt core.ReplicatedData
}

var _ core.ReplicatedData = (*baseMerkleCRDT)(nil)

func (base *baseMerkleCRDT) Clock() MerkleClock {
return base.clock
}

func (base *baseMerkleCRDT) Merge(ctx context.Context, other core.Delta) error {
return base.crdt.Merge(ctx, other)
}

func InstanceWithStore(
store Stores,
schemaVersionKey core.CollectionSchemaVersionKey,
Expand Down
55 changes: 0 additions & 55 deletions internal/merkle/crdt/merklecrdt_test.go

This file was deleted.

0 comments on commit df64796

Please sign in to comment.