From b2fdcfc9370ad46c41d168f3d8853de24076dbeb Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Thu, 31 Oct 2024 17:27:15 -0400 Subject: [PATCH] Simplify MerkleCRDT interface Composite CRDTs (doc and the future collection crdt) are never Saved via the existing interface, and by removing the Save function from it we remove indirection, loss of type safety, and some overly defensive code. --- internal/db/collection.go | 2 +- internal/db/fetcher/versioned.go | 2 +- internal/db/merge.go | 2 +- internal/merkle/crdt/composite.go | 8 +------ internal/merkle/crdt/counter.go | 10 +++------ internal/merkle/crdt/errors.go | 35 ------------------------------ internal/merkle/crdt/lwwreg.go | 11 +++------- internal/merkle/crdt/merklecrdt.go | 10 ++++++--- 8 files changed, 17 insertions(+), 63 deletions(-) delete mode 100644 internal/merkle/crdt/errors.go diff --git a/internal/db/collection.go b/internal/db/collection.go index 3461fefd80..8d71c7aff6 100644 --- a/internal/db/collection.go +++ b/internal/db/collection.go @@ -664,7 +664,7 @@ func (c *collection) save( return err } - merkleCRDT, err := merklecrdt.InstanceWithStore( + merkleCRDT, err := merklecrdt.FieldLevelCRDTWithStore( txn, keys.NewCollectionSchemaVersionKey(c.Schema().VersionID, c.ID()), val.Type(), diff --git a/internal/db/fetcher/versioned.go b/internal/db/fetcher/versioned.go index b2294f57e5..6c7bd1dfd7 100644 --- a/internal/db/fetcher/versioned.go +++ b/internal/db/fetcher/versioned.go @@ -394,7 +394,7 @@ func (vf *VersionedFetcher) processBlock( ) (err error) { mcrdt, exists := vf.mCRDTs[field.ID] if !exists { - mcrdt, err = merklecrdt.InstanceWithStore( + mcrdt, err = merklecrdt.FieldLevelCRDTWithStore( vf.store, keys.CollectionSchemaVersionKey{}, field.Typ, diff --git a/internal/db/merge.go b/internal/db/merge.go index d1b96d5b77..74db1ad302 100644 --- a/internal/db/merge.go +++ b/internal/db/merge.go @@ -455,7 +455,7 @@ func (mp *mergeProcessor) initCRDTForType(field string) (merklecrdt.MerkleCRDT, return nil, nil } - mcrdt, err := merklecrdt.InstanceWithStore( + mcrdt, err := merklecrdt.FieldLevelCRDTWithStore( mp.txn, schemaVersionKey, fd.Typ, diff --git a/internal/merkle/crdt/composite.go b/internal/merkle/crdt/composite.go index 930be571e7..862541bf8f 100644 --- a/internal/merkle/crdt/composite.go +++ b/internal/merkle/crdt/composite.go @@ -66,13 +66,7 @@ func (m *MerkleCompositeDAG) Delete( } // Save the value of the composite CRDT to DAG. -func (m *MerkleCompositeDAG) Save(ctx context.Context, data any) (cidlink.Link, []byte, error) { - links, ok := data.([]coreblock.DAGLink) - if !ok { - return cidlink.Link{}, nil, NewErrUnexpectedValueType(client.COMPOSITE, []coreblock.DAGLink{}, data) - } - +func (m *MerkleCompositeDAG) Save(ctx context.Context, links []coreblock.DAGLink) (cidlink.Link, []byte, error) { delta := m.reg.Set(client.Active) - return m.clock.AddDelta(ctx, delta, links...) } diff --git a/internal/merkle/crdt/counter.go b/internal/merkle/crdt/counter.go index 50434ed7da..b5c45c4121 100644 --- a/internal/merkle/crdt/counter.go +++ b/internal/merkle/crdt/counter.go @@ -27,7 +27,7 @@ type MerkleCounter struct { reg crdt.Counter } -var _ MerkleCRDT = (*MerkleCounter)(nil) +var _ FieldLevelMerkleCRDT = (*MerkleCounter)(nil) // NewMerkleCounter creates a new instance (or loaded from DB) of a MerkleCRDT // backed by a Counter CRDT. @@ -53,12 +53,8 @@ func (m *MerkleCounter) Clock() *clock.MerkleClock { } // Save the value of the Counter to the DAG. -func (m *MerkleCounter) Save(ctx context.Context, data any) (cidlink.Link, []byte, error) { - value, ok := data.(*DocField) - if !ok { - return cidlink.Link{}, nil, NewErrUnexpectedValueType(m.reg.CType(), &client.FieldValue{}, data) - } - bytes, err := value.FieldValue.Bytes() +func (m *MerkleCounter) Save(ctx context.Context, data *DocField) (cidlink.Link, []byte, error) { + bytes, err := data.FieldValue.Bytes() if err != nil { return cidlink.Link{}, nil, err } diff --git a/internal/merkle/crdt/errors.go b/internal/merkle/crdt/errors.go deleted file mode 100644 index 58ee8b6bc4..0000000000 --- a/internal/merkle/crdt/errors.go +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2024 Democratized Data Foundation -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package merklecrdt - -import ( - "fmt" - - "github.com/sourcenetwork/defradb/client" - "github.com/sourcenetwork/defradb/errors" -) - -const ( - errUnexpectedValueType = "unexpected value type for merkle CRDT" -) - -var ( - ErrUnexpectedValueType = errors.New(errUnexpectedValueType) -) - -func NewErrUnexpectedValueType(cType client.CType, expected, actual any) error { - return errors.New( - errUnexpectedValueType, - errors.NewKV("CRDT", cType.String()), - errors.NewKV("expected", fmt.Sprintf("%T", expected)), - errors.NewKV("actual", fmt.Sprintf("%T", actual)), - ) -} diff --git a/internal/merkle/crdt/lwwreg.go b/internal/merkle/crdt/lwwreg.go index 18fc7ee35d..6dae0a1a31 100644 --- a/internal/merkle/crdt/lwwreg.go +++ b/internal/merkle/crdt/lwwreg.go @@ -15,7 +15,6 @@ import ( cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/sourcenetwork/defradb/client" corecrdt "github.com/sourcenetwork/defradb/internal/core/crdt" "github.com/sourcenetwork/defradb/internal/keys" "github.com/sourcenetwork/defradb/internal/merkle/clock" @@ -27,7 +26,7 @@ type MerkleLWWRegister struct { reg corecrdt.LWWRegister } -var _ MerkleCRDT = (*MerkleLWWRegister)(nil) +var _ FieldLevelMerkleCRDT = (*MerkleLWWRegister)(nil) // NewMerkleLWWRegister creates a new instance (or loaded from DB) of a MerkleCRDT // backed by a LWWRegister CRDT. @@ -51,12 +50,8 @@ func (m *MerkleLWWRegister) Clock() *clock.MerkleClock { } // Save the value of the register to the DAG. -func (m *MerkleLWWRegister) Save(ctx context.Context, data any) (cidlink.Link, []byte, error) { - value, ok := data.(*DocField) - if !ok { - return cidlink.Link{}, nil, NewErrUnexpectedValueType(client.LWW_REGISTER, &client.FieldValue{}, data) - } - bytes, err := value.FieldValue.Bytes() +func (m *MerkleLWWRegister) Save(ctx context.Context, data *DocField) (cidlink.Link, []byte, error) { + bytes, err := data.FieldValue.Bytes() if err != nil { return cidlink.Link{}, nil, err } diff --git a/internal/merkle/crdt/merklecrdt.go b/internal/merkle/crdt/merklecrdt.go index 9b4cd019a8..911a81c10c 100644 --- a/internal/merkle/crdt/merklecrdt.go +++ b/internal/merkle/crdt/merklecrdt.go @@ -38,17 +38,21 @@ type Stores interface { // so it can be merged with any given semantics. type MerkleCRDT interface { Clock() *clock.MerkleClock - Save(ctx context.Context, data any) (cidlink.Link, []byte, error) } -func InstanceWithStore( +type FieldLevelMerkleCRDT interface { + MerkleCRDT + Save(ctx context.Context, data *DocField) (cidlink.Link, []byte, error) +} + +func FieldLevelCRDTWithStore( store Stores, schemaVersionKey keys.CollectionSchemaVersionKey, cType client.CType, kind client.FieldKind, key keys.DataStoreKey, fieldName string, -) (MerkleCRDT, error) { +) (FieldLevelMerkleCRDT, error) { switch cType { case client.LWW_REGISTER: return NewMerkleLWWRegister(