diff --git a/internal/db/collection.go b/internal/db/collection.go index 3461fefd80..8d71c7aff6 100644 --- a/internal/db/collection.go +++ b/internal/db/collection.go @@ -664,7 +664,7 @@ func (c *collection) save( return err } - merkleCRDT, err := merklecrdt.InstanceWithStore( + merkleCRDT, err := merklecrdt.FieldLevelCRDTWithStore( txn, keys.NewCollectionSchemaVersionKey(c.Schema().VersionID, c.ID()), val.Type(), diff --git a/internal/db/fetcher/versioned.go b/internal/db/fetcher/versioned.go index b2294f57e5..6c7bd1dfd7 100644 --- a/internal/db/fetcher/versioned.go +++ b/internal/db/fetcher/versioned.go @@ -394,7 +394,7 @@ func (vf *VersionedFetcher) processBlock( ) (err error) { mcrdt, exists := vf.mCRDTs[field.ID] if !exists { - mcrdt, err = merklecrdt.InstanceWithStore( + mcrdt, err = merklecrdt.FieldLevelCRDTWithStore( vf.store, keys.CollectionSchemaVersionKey{}, field.Typ, diff --git a/internal/db/merge.go b/internal/db/merge.go index d1b96d5b77..74db1ad302 100644 --- a/internal/db/merge.go +++ b/internal/db/merge.go @@ -455,7 +455,7 @@ func (mp *mergeProcessor) initCRDTForType(field string) (merklecrdt.MerkleCRDT, return nil, nil } - mcrdt, err := merklecrdt.InstanceWithStore( + mcrdt, err := merklecrdt.FieldLevelCRDTWithStore( mp.txn, schemaVersionKey, fd.Typ, diff --git a/internal/merkle/crdt/composite.go b/internal/merkle/crdt/composite.go index 930be571e7..862541bf8f 100644 --- a/internal/merkle/crdt/composite.go +++ b/internal/merkle/crdt/composite.go @@ -66,13 +66,7 @@ func (m *MerkleCompositeDAG) Delete( } // Save the value of the composite CRDT to DAG. -func (m *MerkleCompositeDAG) Save(ctx context.Context, data any) (cidlink.Link, []byte, error) { - links, ok := data.([]coreblock.DAGLink) - if !ok { - return cidlink.Link{}, nil, NewErrUnexpectedValueType(client.COMPOSITE, []coreblock.DAGLink{}, data) - } - +func (m *MerkleCompositeDAG) Save(ctx context.Context, links []coreblock.DAGLink) (cidlink.Link, []byte, error) { delta := m.reg.Set(client.Active) - return m.clock.AddDelta(ctx, delta, links...) } diff --git a/internal/merkle/crdt/counter.go b/internal/merkle/crdt/counter.go index 50434ed7da..b5c45c4121 100644 --- a/internal/merkle/crdt/counter.go +++ b/internal/merkle/crdt/counter.go @@ -27,7 +27,7 @@ type MerkleCounter struct { reg crdt.Counter } -var _ MerkleCRDT = (*MerkleCounter)(nil) +var _ FieldLevelMerkleCRDT = (*MerkleCounter)(nil) // NewMerkleCounter creates a new instance (or loaded from DB) of a MerkleCRDT // backed by a Counter CRDT. @@ -53,12 +53,8 @@ func (m *MerkleCounter) Clock() *clock.MerkleClock { } // Save the value of the Counter to the DAG. -func (m *MerkleCounter) Save(ctx context.Context, data any) (cidlink.Link, []byte, error) { - value, ok := data.(*DocField) - if !ok { - return cidlink.Link{}, nil, NewErrUnexpectedValueType(m.reg.CType(), &client.FieldValue{}, data) - } - bytes, err := value.FieldValue.Bytes() +func (m *MerkleCounter) Save(ctx context.Context, data *DocField) (cidlink.Link, []byte, error) { + bytes, err := data.FieldValue.Bytes() if err != nil { return cidlink.Link{}, nil, err } diff --git a/internal/merkle/crdt/errors.go b/internal/merkle/crdt/errors.go deleted file mode 100644 index 58ee8b6bc4..0000000000 --- a/internal/merkle/crdt/errors.go +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2024 Democratized Data Foundation -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package merklecrdt - -import ( - "fmt" - - "github.com/sourcenetwork/defradb/client" - "github.com/sourcenetwork/defradb/errors" -) - -const ( - errUnexpectedValueType = "unexpected value type for merkle CRDT" -) - -var ( - ErrUnexpectedValueType = errors.New(errUnexpectedValueType) -) - -func NewErrUnexpectedValueType(cType client.CType, expected, actual any) error { - return errors.New( - errUnexpectedValueType, - errors.NewKV("CRDT", cType.String()), - errors.NewKV("expected", fmt.Sprintf("%T", expected)), - errors.NewKV("actual", fmt.Sprintf("%T", actual)), - ) -} diff --git a/internal/merkle/crdt/lwwreg.go b/internal/merkle/crdt/lwwreg.go index 18fc7ee35d..6dae0a1a31 100644 --- a/internal/merkle/crdt/lwwreg.go +++ b/internal/merkle/crdt/lwwreg.go @@ -15,7 +15,6 @@ import ( cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/sourcenetwork/defradb/client" corecrdt "github.com/sourcenetwork/defradb/internal/core/crdt" "github.com/sourcenetwork/defradb/internal/keys" "github.com/sourcenetwork/defradb/internal/merkle/clock" @@ -27,7 +26,7 @@ type MerkleLWWRegister struct { reg corecrdt.LWWRegister } -var _ MerkleCRDT = (*MerkleLWWRegister)(nil) +var _ FieldLevelMerkleCRDT = (*MerkleLWWRegister)(nil) // NewMerkleLWWRegister creates a new instance (or loaded from DB) of a MerkleCRDT // backed by a LWWRegister CRDT. @@ -51,12 +50,8 @@ func (m *MerkleLWWRegister) Clock() *clock.MerkleClock { } // Save the value of the register to the DAG. -func (m *MerkleLWWRegister) Save(ctx context.Context, data any) (cidlink.Link, []byte, error) { - value, ok := data.(*DocField) - if !ok { - return cidlink.Link{}, nil, NewErrUnexpectedValueType(client.LWW_REGISTER, &client.FieldValue{}, data) - } - bytes, err := value.FieldValue.Bytes() +func (m *MerkleLWWRegister) Save(ctx context.Context, data *DocField) (cidlink.Link, []byte, error) { + bytes, err := data.FieldValue.Bytes() if err != nil { return cidlink.Link{}, nil, err } diff --git a/internal/merkle/crdt/merklecrdt.go b/internal/merkle/crdt/merklecrdt.go index 9b4cd019a8..911a81c10c 100644 --- a/internal/merkle/crdt/merklecrdt.go +++ b/internal/merkle/crdt/merklecrdt.go @@ -38,17 +38,21 @@ type Stores interface { // so it can be merged with any given semantics. type MerkleCRDT interface { Clock() *clock.MerkleClock - Save(ctx context.Context, data any) (cidlink.Link, []byte, error) } -func InstanceWithStore( +type FieldLevelMerkleCRDT interface { + MerkleCRDT + Save(ctx context.Context, data *DocField) (cidlink.Link, []byte, error) +} + +func FieldLevelCRDTWithStore( store Stores, schemaVersionKey keys.CollectionSchemaVersionKey, cType client.CType, kind client.FieldKind, key keys.DataStoreKey, fieldName string, -) (MerkleCRDT, error) { +) (FieldLevelMerkleCRDT, error) { switch cType { case client.LWW_REGISTER: return NewMerkleLWWRegister(