From a0535f1e1ba65986209f8ab4e10d489050d52e5e Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Tue, 19 Nov 2024 17:24:51 -0500 Subject: [PATCH] Simplify versionedFetcher.merge --- internal/db/fetcher/versioned.go | 62 ++++++++++++++++---------------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/internal/db/fetcher/versioned.go b/internal/db/fetcher/versioned.go index 27c4d80baf..2e207b4490 100644 --- a/internal/db/fetcher/versioned.go +++ b/internal/db/fetcher/versioned.go @@ -16,6 +16,7 @@ import ( "fmt" "github.com/ipfs/go-cid" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/sourcenetwork/immutable" @@ -309,39 +310,26 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error { return err } - link, err := block.GenerateLink() - if err != nil { - return err - } - - mcrdt := merklecrdt.NewMerkleCompositeDAG( - vf.store, - keys.NewCollectionSchemaVersionKey(block.Delta.GetSchemaVersionID(), vf.col.Description().RootID), - keys.DataStoreKey{ - CollectionRootID: vf.col.Description().RootID, - DocID: string(block.Delta.GetDocID()), - FieldID: fmt.Sprint(core.COMPOSITE_NAMESPACE), - }, - ) - err = mcrdt.Clock().ProcessBlock(vf.ctx, block, link) - if err != nil { - return err - } - - // handle subgraphs - for _, l := range block.Links { - // get node - subBlock, err := vf.getDAGBlock(l.Link.Cid) - if err != nil { - return err - } + var mcrdt merklecrdt.MerkleCRDT + switch { + case block.Delta.IsComposite(): + mcrdt = merklecrdt.NewMerkleCompositeDAG( + vf.store, + keys.NewCollectionSchemaVersionKey(block.Delta.GetSchemaVersionID(), vf.col.Description().RootID), + keys.DataStoreKey{ + CollectionRootID: vf.col.Description().RootID, + DocID: string(block.Delta.GetDocID()), + FieldID: fmt.Sprint(core.COMPOSITE_NAMESPACE), + }, + ) - field, ok := vf.col.Definition().GetFieldByName(l.Name) + default: + field, ok := vf.col.Definition().GetFieldByName(block.Delta.GetFieldName()) if !ok { - return client.NewErrFieldNotExist(l.Name) + return client.NewErrFieldNotExist(block.Delta.GetFieldName()) } - mcrdt, err := merklecrdt.FieldLevelCRDTWithStore( + mcrdt, err = merklecrdt.FieldLevelCRDTWithStore( vf.store, keys.NewCollectionSchemaVersionKey(block.Delta.GetSchemaVersionID(), vf.col.Description().RootID), field.Typ, @@ -356,8 +344,22 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error { if err != nil { return err } + } - err = mcrdt.Clock().ProcessBlock(vf.ctx, subBlock, l.Link) + err = mcrdt.Clock().ProcessBlock( + vf.ctx, + block, + cidlink.Link{ + Cid: c, + }, + ) + if err != nil { + return err + } + + // handle subgraphs + for _, l := range block.AllLinks() { + err = vf.merge(l.Cid) if err != nil { return err }