From df647969cd28c96bd6a12f6804420abab4868145 Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Mon, 28 Oct 2024 14:03:21 -0400 Subject: [PATCH] WIP - Remove baseMerkleCRDT Was causing so much indirection and seriously inhibited the readability of the code by losing the known type in the Merge function. Now we can at least go from merkle.lwwr.Merge to core.lwwr.Merge without chasing down all implementations of the interface and wondering if they can all form part of a merkle.lwwr... --- internal/merkle/crdt/composite.go | 17 ++++++-- internal/merkle/crdt/counter.go | 29 ++++++++----- internal/merkle/crdt/lwwreg.go | 27 ++++++++---- internal/merkle/crdt/merklecrdt.go | 17 -------- internal/merkle/crdt/merklecrdt_test.go | 55 ------------------------- 5 files changed, 50 insertions(+), 95 deletions(-) delete mode 100644 internal/merkle/crdt/merklecrdt_test.go diff --git a/internal/merkle/crdt/composite.go b/internal/merkle/crdt/composite.go index bf277dddba..dd0450d04c 100644 --- a/internal/merkle/crdt/composite.go +++ b/internal/merkle/crdt/composite.go @@ -24,11 +24,13 @@ import ( // MerkleCompositeDAG is a MerkleCRDT implementation of the CompositeDAG using MerkleClocks. type MerkleCompositeDAG struct { - *baseMerkleCRDT + clock MerkleClock // core.ReplicatedData reg corecrdt.CompositeDAG } +var _ MerkleCRDT = (*MerkleCompositeDAG)(nil) + // NewMerkleCompositeDAG creates a new instance (or loaded from DB) of a MerkleCRDT // backed by a CompositeDAG CRDT. func NewMerkleCompositeDAG( @@ -44,14 +46,21 @@ func NewMerkleCompositeDAG( clock := clock.NewMerkleClock(store.Headstore(), store.Blockstore(), store.Encstore(), key.ToHeadStoreKey(), compositeDag) - base := &baseMerkleCRDT{clock: clock, crdt: compositeDag} return &MerkleCompositeDAG{ - baseMerkleCRDT: base, - reg: compositeDag, + clock: clock, + reg: compositeDag, } } +func (m *MerkleCompositeDAG) Clock() MerkleClock { + return m.clock +} + +func (m *MerkleCompositeDAG) Merge(ctx context.Context, other core.Delta) error { + return m.reg.Merge(ctx, other) +} + // Delete sets the values of CompositeDAG for a delete. func (m *MerkleCompositeDAG) Delete( ctx context.Context, diff --git a/internal/merkle/crdt/counter.go b/internal/merkle/crdt/counter.go index 21b26785b6..c91a19f030 100644 --- a/internal/merkle/crdt/counter.go +++ b/internal/merkle/crdt/counter.go @@ -23,11 +23,12 @@ import ( // MerkleCounter is a MerkleCRDT implementation of the Counter using MerkleClocks. type MerkleCounter struct { - *baseMerkleCRDT - - reg crdt.Counter + clock MerkleClock + reg crdt.Counter } +var _ MerkleCRDT = (*MerkleCounter)(nil) + // NewMerkleCounter creates a new instance (or loaded from DB) of a MerkleCRDT // backed by a Counter CRDT. func NewMerkleCounter( @@ -40,26 +41,34 @@ func NewMerkleCounter( ) *MerkleCounter { register := crdt.NewCounter(store.Datastore(), schemaVersionKey, key, fieldName, allowDecrement, kind) clk := clock.NewMerkleClock(store.Headstore(), store.Blockstore(), store.Encstore(), key.ToHeadStoreKey(), register) - base := &baseMerkleCRDT{clock: clk, crdt: register} + return &MerkleCounter{ - baseMerkleCRDT: base, - reg: register, + clock: clk, + reg: register, } } +func (m *MerkleCounter) Clock() MerkleClock { + return m.clock +} + +func (m *MerkleCounter) Merge(ctx context.Context, other core.Delta) error { + return m.reg.Merge(ctx, other) +} + // Save the value of the Counter to the DAG. -func (mc *MerkleCounter) Save(ctx context.Context, data any) (cidlink.Link, []byte, error) { +func (m *MerkleCounter) Save(ctx context.Context, data any) (cidlink.Link, []byte, error) { value, ok := data.(*DocField) if !ok { - return cidlink.Link{}, nil, NewErrUnexpectedValueType(mc.reg.CType(), &client.FieldValue{}, data) + return cidlink.Link{}, nil, NewErrUnexpectedValueType(m.reg.CType(), &client.FieldValue{}, data) } bytes, err := value.FieldValue.Bytes() if err != nil { return cidlink.Link{}, nil, err } - delta, err := mc.reg.Increment(ctx, bytes) + delta, err := m.reg.Increment(ctx, bytes) if err != nil { return cidlink.Link{}, nil, err } - return mc.clock.AddDelta(ctx, delta) + return m.clock.AddDelta(ctx, delta) } diff --git a/internal/merkle/crdt/lwwreg.go b/internal/merkle/crdt/lwwreg.go index 00c70dc4a9..adbbd1e1e0 100644 --- a/internal/merkle/crdt/lwwreg.go +++ b/internal/merkle/crdt/lwwreg.go @@ -23,11 +23,12 @@ import ( // MerkleLWWRegister is a MerkleCRDT implementation of the LWWRegister using MerkleClocks. type MerkleLWWRegister struct { - *baseMerkleCRDT - - reg corecrdt.LWWRegister + clock MerkleClock + reg corecrdt.LWWRegister } +var _ MerkleCRDT = (*MerkleLWWRegister)(nil) + // NewMerkleLWWRegister creates a new instance (or loaded from DB) of a MerkleCRDT // backed by a LWWRegister CRDT. func NewMerkleLWWRegister( @@ -38,15 +39,23 @@ func NewMerkleLWWRegister( ) *MerkleLWWRegister { register := corecrdt.NewLWWRegister(store.Datastore(), schemaVersionKey, key, fieldName) clk := clock.NewMerkleClock(store.Headstore(), store.Blockstore(), store.Encstore(), key.ToHeadStoreKey(), register) - base := &baseMerkleCRDT{clock: clk, crdt: register} + return &MerkleLWWRegister{ - baseMerkleCRDT: base, - reg: register, + clock: clk, + reg: register, } } +func (m *MerkleLWWRegister) Clock() MerkleClock { + return m.clock +} + +func (m *MerkleLWWRegister) Merge(ctx context.Context, other core.Delta) error { + return m.reg.Merge(ctx, other) +} + // Save the value of the register to the DAG. -func (mlwwreg *MerkleLWWRegister) Save(ctx context.Context, data any) (cidlink.Link, []byte, error) { +func (m *MerkleLWWRegister) Save(ctx context.Context, data any) (cidlink.Link, []byte, error) { value, ok := data.(*DocField) if !ok { return cidlink.Link{}, nil, NewErrUnexpectedValueType(client.LWW_REGISTER, &client.FieldValue{}, data) @@ -58,6 +67,6 @@ func (mlwwreg *MerkleLWWRegister) Save(ctx context.Context, data any) (cidlink.L // Set() call on underlying LWWRegister CRDT // persist/publish delta - delta := mlwwreg.reg.Set(bytes) - return mlwwreg.clock.AddDelta(ctx, delta) + delta := m.reg.Set(bytes) + return m.clock.AddDelta(ctx, delta) } diff --git a/internal/merkle/crdt/merklecrdt.go b/internal/merkle/crdt/merklecrdt.go index 14b75cc7d7..d78453ec00 100644 --- a/internal/merkle/crdt/merklecrdt.go +++ b/internal/merkle/crdt/merklecrdt.go @@ -52,23 +52,6 @@ type MerkleClock interface { ProcessBlock(ctx context.Context, block *coreblock.Block, cid cidlink.Link) error } -// baseMerkleCRDT handles the MerkleCRDT overhead functions that aren't CRDT specific like the mutations and state -// retrieval functions. It handles creating and publishing the CRDT DAG with the help of the MerkleClock. -type baseMerkleCRDT struct { - clock MerkleClock - crdt core.ReplicatedData -} - -var _ core.ReplicatedData = (*baseMerkleCRDT)(nil) - -func (base *baseMerkleCRDT) Clock() MerkleClock { - return base.clock -} - -func (base *baseMerkleCRDT) Merge(ctx context.Context, other core.Delta) error { - return base.crdt.Merge(ctx, other) -} - func InstanceWithStore( store Stores, schemaVersionKey core.CollectionSchemaVersionKey, diff --git a/internal/merkle/crdt/merklecrdt_test.go b/internal/merkle/crdt/merklecrdt_test.go deleted file mode 100644 index 74f4814ca3..0000000000 --- a/internal/merkle/crdt/merklecrdt_test.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2022 Democratized Data Foundation -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package merklecrdt - -import ( - "context" - "testing" - - "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" - - "github.com/sourcenetwork/defradb/datastore" - "github.com/sourcenetwork/defradb/internal/core" - crdt "github.com/sourcenetwork/defradb/internal/core/crdt" - "github.com/sourcenetwork/defradb/internal/merkle/clock" -) - -func newDS() ds.Datastore { - return ds.NewMapDatastore() -} - -func newTestBaseMerkleCRDT() (*baseMerkleCRDT, datastore.DSReaderWriter) { - s := newDS() - multistore := datastore.MultiStoreFrom(s) - - reg := crdt.NewLWWRegister(multistore.Datastore(), core.CollectionSchemaVersionKey{}, core.DataStoreKey{}, "") - clk := clock.NewMerkleClock(multistore.Headstore(), multistore.Blockstore(), multistore.Encstore(), core.HeadStoreKey{}, reg) - return &baseMerkleCRDT{clock: clk, crdt: reg}, multistore.Rootstore() -} - -func TestMerkleCRDTPublish(t *testing.T) { - ctx := context.Background() - bCRDT, _ := newTestBaseMerkleCRDT() - reg := crdt.LWWRegister{} - delta := reg.Set([]byte("test")) - - link, _, err := bCRDT.clock.AddDelta(ctx, delta) - if err != nil { - t.Error("Failed to publish delta to MerkleCRDT:", err) - return - } - - if link.Cid == cid.Undef { - t.Error("Published returned invalid CID Undef:", link.Cid) - return - } -}