diff --git a/internal/db/fetcher/versioned.go b/internal/db/fetcher/versioned.go index 9ac9b7d13d..9466952d90 100644 --- a/internal/db/fetcher/versioned.go +++ b/internal/db/fetcher/versioned.go @@ -16,6 +16,7 @@ import ( "fmt" "github.com/ipfs/go-cid" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/sourcenetwork/immutable" @@ -315,39 +316,25 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error { return err } - link, err := block.GenerateLink() - if err != nil { - return err - } - - mcrdt := merklecrdt.NewMerkleCompositeDAG( - vf.store, - keys.NewCollectionSchemaVersionKey(block.Delta.GetSchemaVersionID(), vf.col.Description().RootID), - keys.DataStoreKey{ - CollectionRootID: vf.col.Description().RootID, - DocID: string(block.Delta.GetDocID()), - FieldID: fmt.Sprint(core.COMPOSITE_NAMESPACE), - }, - ) - err = mcrdt.Clock().ProcessBlock(vf.ctx, block, link) - if err != nil { - return err - } - - // handle subgraphs - for _, l := range block.Links { - // get node - subBlock, err := vf.getDAGBlock(l.Link.Cid) - if err != nil { - return err - } - - field, ok := vf.col.Definition().GetFieldByName(l.Name) + var mcrdt merklecrdt.MerkleCRDT + switch { + case block.Delta.IsComposite(): + mcrdt = merklecrdt.NewMerkleCompositeDAG( + vf.store, + keys.NewCollectionSchemaVersionKey(block.Delta.GetSchemaVersionID(), vf.col.Description().RootID), + keys.DataStoreKey{ + CollectionRootID: vf.col.Description().RootID, + DocID: string(block.Delta.GetDocID()), + FieldID: fmt.Sprint(core.COMPOSITE_NAMESPACE), + }, + ) + default: + field, ok := vf.col.Definition().GetFieldByName(block.Delta.GetFieldName()) if !ok { - return client.NewErrFieldNotExist(l.Name) + return client.NewErrFieldNotExist(block.Delta.GetFieldName()) } - mcrdt, err := merklecrdt.FieldLevelCRDTWithStore( + mcrdt, err = merklecrdt.FieldLevelCRDTWithStore( vf.store, keys.NewCollectionSchemaVersionKey(block.Delta.GetSchemaVersionID(), vf.col.Description().RootID), field.Typ, @@ -362,8 +349,22 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error { if err != nil { return err } + } + + err = mcrdt.Clock().ProcessBlock( + vf.ctx, + block, + cidlink.Link{ + Cid: c, + }, + ) + if err != nil { + return err + } - err = mcrdt.Clock().ProcessBlock(vf.ctx, subBlock, l.Link) + // handle subgraphs + for _, l := range block.AllLinks() { + err = vf.merge(l.Cid) if err != nil { return err }