From d4560fccd9709a777696b56f1ff5ec63f4d736bc Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Tue, 19 Nov 2024 17:24:51 -0500 Subject: [PATCH] WIP - Simplify versionedFetcher.merge --- internal/db/fetcher/versioned.go | 55 ++++++++++++++++---------------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/internal/db/fetcher/versioned.go b/internal/db/fetcher/versioned.go index 92edc63a4c..a1b46d3c5f 100644 --- a/internal/db/fetcher/versioned.go +++ b/internal/db/fetcher/versioned.go @@ -16,6 +16,7 @@ import ( "fmt" "github.com/ipfs/go-cid" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/sourcenetwork/immutable" @@ -315,35 +316,21 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error { return err } - link, err := block.GenerateLink() - if err != nil { - return err - } - - mcrdt := merklecrdt.NewMerkleCompositeDAG( - vf.store, - keys.NewCollectionSchemaVersionKey(block.Delta.GetSchemaVersionID(), vf.col.Description().RootID), - vf.dsKey.WithFieldID(core.COMPOSITE_NAMESPACE), - ) - err = mcrdt.Clock().ProcessBlock(vf.ctx, block, link) - if err != nil { - return err - } - - // handle subgraphs - for _, l := range block.Links { - // get node - subBlock, err := vf.getDAGBlock(l.Link.Cid) - if err != nil { - return err - } - - field, ok := vf.col.Definition().GetFieldByName(l.Name) + var mcrdt merklecrdt.MerkleCRDT + switch { + case block.Delta.IsComposite(): + mcrdt = merklecrdt.NewMerkleCompositeDAG( + vf.store, + keys.NewCollectionSchemaVersionKey(block.Delta.GetSchemaVersionID(), vf.col.Description().RootID), + vf.dsKey.WithFieldID(core.COMPOSITE_NAMESPACE), + ) + default: + field, ok := vf.col.Definition().GetFieldByName(block.Delta.GetFieldName()) if !ok { - return client.NewErrFieldNotExist(l.Name) + return client.NewErrFieldNotExist(block.Delta.GetFieldName()) } - mcrdt, err := merklecrdt.FieldLevelCRDTWithStore( + mcrdt, err = merklecrdt.FieldLevelCRDTWithStore( vf.store, keys.NewCollectionSchemaVersionKey(block.Delta.GetSchemaVersionID(), vf.col.Description().RootID), field.Typ, @@ -354,8 +341,22 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error { if err != nil { return err } + } - err = mcrdt.Clock().ProcessBlock(vf.ctx, subBlock, l.Link) + err = mcrdt.Clock().ProcessBlock( + vf.ctx, + block, + cidlink.Link{ + Cid: c, + }, + ) + if err != nil { + return err + } + + // handle subgraphs + for _, l := range block.AllLinks() { + err = vf.merge(l.Cid) if err != nil { return err }