diff --git a/internal/db/fetcher/versioned.go b/internal/db/fetcher/versioned.go index b8b6527d21..f29f88d560 100644 --- a/internal/db/fetcher/versioned.go +++ b/internal/db/fetcher/versioned.go @@ -95,8 +95,6 @@ type VersionedFetcher struct { acp immutable.Option[acp.ACP] col client.Collection - // @todo index *client.IndexDescription - mCRDTs map[client.FieldID]merklecrdt.MerkleCRDT } // Init initializes the VersionedFetcher. @@ -115,7 +113,6 @@ func (vf *VersionedFetcher) Init( vf.acp = acp vf.col = col vf.queuedCids = list.New() - vf.mCRDTs = make(map[client.FieldID]merklecrdt.MerkleCRDT) vf.txn = txn // create store @@ -323,20 +320,15 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error { return err } - // first arg 0 is the index for the composite DAG in the mCRDTs cache - mcrdt, exists := vf.mCRDTs[0] - if !exists { - mcrdt = merklecrdt.NewMerkleCompositeDAG( - vf.store, - keys.NewCollectionSchemaVersionKey(block.Delta.GetSchemaVersionID(), vf.col.Description().RootID), - keys.DataStoreKey{ - CollectionRootID: vf.col.Description().RootID, - DocID: string(block.Delta.GetDocID()), - FieldID: fmt.Sprint(core.COMPOSITE_NAMESPACE), - }, - ) - vf.mCRDTs[0] = mcrdt - } + mcrdt := merklecrdt.NewMerkleCompositeDAG( + vf.store, + keys.NewCollectionSchemaVersionKey(block.Delta.GetSchemaVersionID(), vf.col.Description().RootID), + keys.DataStoreKey{ + CollectionRootID: vf.col.Description().RootID, + DocID: string(block.Delta.GetDocID()), + FieldID: fmt.Sprint(core.COMPOSITE_NAMESPACE), + }, + ) err = mcrdt.Clock().ProcessBlock(vf.ctx, block, link) if err != nil { return err @@ -355,24 +347,20 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error { return client.NewErrFieldNotExist(l.Name) } - mcrdt, exists := vf.mCRDTs[field.ID] - if !exists { - mcrdt, err = merklecrdt.FieldLevelCRDTWithStore( - vf.store, - keys.NewCollectionSchemaVersionKey(block.Delta.GetSchemaVersionID(), vf.col.Description().RootID), - field.Typ, - field.Kind, - keys.DataStoreKey{ - CollectionRootID: vf.col.Description().RootID, - DocID: string(block.Delta.GetDocID()), - FieldID: fmt.Sprint(field.ID), - }, - field.Name, - ) - if err != nil { - return err - } - vf.mCRDTs[field.ID] = mcrdt + mcrdt, err := merklecrdt.FieldLevelCRDTWithStore( + vf.store, + keys.NewCollectionSchemaVersionKey(block.Delta.GetSchemaVersionID(), vf.col.Description().RootID), + field.Typ, + field.Kind, + keys.DataStoreKey{ + CollectionRootID: vf.col.Description().RootID, + DocID: string(block.Delta.GetDocID()), + FieldID: fmt.Sprint(field.ID), + }, + field.Name, + ) + if err != nil { + return err } err = mcrdt.Clock().ProcessBlock(vf.ctx, subBlock, l.Link)