diff --git a/internal/db/base/collection_keys.go b/internal/db/base/collection_keys.go index 31cdeef18c..015a2aeee3 100644 --- a/internal/db/base/collection_keys.go +++ b/internal/db/base/collection_keys.go @@ -11,10 +11,7 @@ package base import ( - "fmt" - "github.com/sourcenetwork/defradb/client" - "github.com/sourcenetwork/defradb/internal/core" "github.com/sourcenetwork/defradb/internal/keys" ) @@ -35,29 +32,3 @@ func MakeDataStoreKeyWithCollectionAndDocID( DocID: docID, } } - -func MakePrimaryIndexKeyForCRDT( - c client.CollectionDefinition, - ctype client.CType, - key keys.DataStoreKey, - fieldName string, -) (keys.DataStoreKey, error) { - switch ctype { - case client.COMPOSITE: - return MakeDataStoreKeyWithCollectionDescription(c.Description). - WithInstanceInfo(key). - WithFieldID(core.COMPOSITE_NAMESPACE), - nil - case client.LWW_REGISTER, client.PN_COUNTER, client.P_COUNTER: - field, ok := c.GetFieldByName(fieldName) - if !ok { - return keys.DataStoreKey{}, client.NewErrFieldNotExist(fieldName) - } - - return MakeDataStoreKeyWithCollectionDescription(c.Description). - WithInstanceInfo(key). - WithFieldID(fmt.Sprint(field.ID)), - nil - } - return keys.DataStoreKey{}, ErrInvalidCrdtType -} diff --git a/internal/db/fetcher/versioned.go b/internal/db/fetcher/versioned.go index 508b0ea406..b2294f57e5 100644 --- a/internal/db/fetcher/versioned.go +++ b/internal/db/fetcher/versioned.go @@ -13,6 +13,7 @@ package fetcher import ( "container/list" "context" + "fmt" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" @@ -27,7 +28,6 @@ import ( "github.com/sourcenetwork/defradb/datastore/memory" "github.com/sourcenetwork/defradb/internal/core" coreblock "github.com/sourcenetwork/defradb/internal/core/block" - "github.com/sourcenetwork/defradb/internal/db/base" "github.com/sourcenetwork/defradb/internal/keys" merklecrdt "github.com/sourcenetwork/defradb/internal/merkle/crdt" "github.com/sourcenetwork/defradb/internal/planner/mapper" @@ -99,7 +99,7 @@ type VersionedFetcher struct { col client.Collection // @todo index *client.IndexDescription - mCRDTs map[uint32]merklecrdt.MerkleCRDT + mCRDTs map[client.FieldID]merklecrdt.MerkleCRDT } // Init initializes the VersionedFetcher. @@ -118,7 +118,7 @@ func (vf *VersionedFetcher) Init( vf.acp = acp vf.col = col vf.queuedCids = list.New() - vf.mCRDTs = make(map[uint32]merklecrdt.MerkleCRDT) + vf.mCRDTs = make(map[client.FieldID]merklecrdt.MerkleCRDT) vf.txn = txn // create store @@ -182,7 +182,7 @@ func (vf *VersionedFetcher) Start(ctx context.Context, spans core.Spans) error { } vf.ctx = ctx - vf.dsKey = dk + vf.dsKey = dk.WithCollectionRoot(vf.col.Description().RootID) vf.version = c if err := vf.seekTo(vf.version); err != nil { @@ -352,7 +352,17 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error { } // first arg 0 is the index for the composite DAG in the mCRDTs cache - if err := vf.processBlock(0, block, link, client.COMPOSITE, client.FieldKind_None, ""); err != nil { + mcrdt, exists := vf.mCRDTs[0] + if !exists { + mcrdt = merklecrdt.NewMerkleCompositeDAG( + vf.store, + keys.CollectionSchemaVersionKey{}, + vf.dsKey.WithFieldID(core.COMPOSITE_NAMESPACE), + ) + vf.mCRDTs[0] = mcrdt + } + err = mcrdt.Clock().ProcessBlock(vf.ctx, block, link) + if err != nil { return err } @@ -368,7 +378,7 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error { if !ok { return client.NewErrFieldNotExist(l.Name) } - if err := vf.processBlock(uint32(field.ID), subBlock, l.Link, field.Typ, field.Kind, l.Name); err != nil { + if err := vf.processBlock(subBlock, l.Link, field, l.Name); err != nil { return err } } @@ -377,32 +387,25 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error { } func (vf *VersionedFetcher) processBlock( - crdtIndex uint32, block *coreblock.Block, blockLink cidlink.Link, - ctype client.CType, - kind client.FieldKind, + field client.FieldDefinition, fieldName string, ) (err error) { - // handle CompositeDAG - mcrdt, exists := vf.mCRDTs[crdtIndex] + mcrdt, exists := vf.mCRDTs[field.ID] if !exists { - dsKey, err := base.MakePrimaryIndexKeyForCRDT(vf.col.Definition(), ctype, vf.dsKey, fieldName) - if err != nil { - return err - } mcrdt, err = merklecrdt.InstanceWithStore( vf.store, keys.CollectionSchemaVersionKey{}, - ctype, - kind, - dsKey, + field.Typ, + field.Kind, + vf.dsKey.WithFieldID(fmt.Sprint(field.ID)), fieldName, ) if err != nil { return err } - vf.mCRDTs[crdtIndex] = mcrdt + vf.mCRDTs[field.ID] = mcrdt } return mcrdt.Clock().ProcessBlock(vf.ctx, block, blockLink) diff --git a/internal/keys/datastore_doc.go b/internal/keys/datastore_doc.go index 37848c148f..258357de83 100644 --- a/internal/keys/datastore_doc.go +++ b/internal/keys/datastore_doc.go @@ -86,6 +86,12 @@ func (k DataStoreKey) WithDeletedFlag() DataStoreKey { return newKey } +func (k DataStoreKey) WithCollectionRoot(colRoot uint32) DataStoreKey { + newKey := k + newKey.CollectionRootID = colRoot + return newKey +} + func (k DataStoreKey) WithDocID(docID string) DataStoreKey { newKey := k newKey.DocID = docID diff --git a/internal/merkle/crdt/merklecrdt.go b/internal/merkle/crdt/merklecrdt.go index 3dd47ad0dc..9b4cd019a8 100644 --- a/internal/merkle/crdt/merklecrdt.go +++ b/internal/merkle/crdt/merklecrdt.go @@ -66,12 +66,6 @@ func InstanceWithStore( cType == client.PN_COUNTER, kind.(client.ScalarKind), ), nil - case client.COMPOSITE: - return NewMerkleCompositeDAG( - store, - schemaVersionKey, - key, - ), nil } return nil, client.NewErrUnknownCRDT(cType) }