From 811dea34c6869d8e537773e958204f9c630355b0 Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Mon, 23 Sep 2024 13:25:04 -0400 Subject: [PATCH] WIP --- internal/core/block/block.go | 31 +++++++++++++++++------------ internal/db/fetcher/versioned.go | 15 ++------------ internal/db/merge.go | 34 +++++++++++++------------------- internal/merkle/clock/clock.go | 10 +--------- net/sync_dag.go | 4 ++-- 5 files changed, 37 insertions(+), 57 deletions(-) diff --git a/internal/core/block/block.go b/internal/core/block/block.go index 1ec62fe939..2abb9f3790 100644 --- a/internal/core/block/block.go +++ b/internal/core/block/block.go @@ -121,6 +121,9 @@ type Encryption struct { type Block struct { // Delta is the CRDT delta that is stored in the block. Delta crdt.CRDT + + Heads []cidlink.Link + // Links are the links to other blocks in the DAG. Links []DAGLink // Encryption contains the encryption information for the block's delta. @@ -142,15 +145,16 @@ func (block *Block) Clone() *Block { } } -// GetHeadLinks returns the CIDs of the previous blocks. There can be more than 1 with multiple heads. -func (block *Block) GetHeadLinks() []cid.Cid { - var heads []cid.Cid +func (block *Block) Links2() []cidlink.Link { + result := make([]cidlink.Link, 0, len(block.Heads)+len(block.Links)) + + result = append(result, block.Heads...) + for _, link := range block.Links { - if link.Name == core.HEAD { - heads = append(heads, link.Cid) - } + result = append(result, link.Link) } - return heads + + return result } // IPLDSchemaBytes returns the IPLD schema representation for the block. @@ -160,6 +164,7 @@ func (block *Block) IPLDSchemaBytes() []byte { return []byte(` type Block struct { delta CRDT + heads [Link] links [DAGLink] encryption optional Link } @@ -188,13 +193,12 @@ func New(delta core.Delta, links []DAGLink, heads ...cid.Cid) *Block { sort.Slice(heads, func(i, j int) bool { return strings.Compare(heads[i].String(), heads[j].String()) < 0 }) + + headLinks := make([]cidlink.Link, len(heads)) for _, head := range heads { - blockLinks = append( - blockLinks, - DAGLink{ - Name: core.HEAD, - Link: cidlink.Link{Cid: head}, - }, + headLinks = append( + headLinks, + cidlink.Link{Cid: head}, ) } @@ -207,6 +211,7 @@ func New(delta core.Delta, links []DAGLink, heads ...cid.Cid) *Block { blockLinks = append(blockLinks, links...) return &Block{ + Heads: headLinks, Links: blockLinks, Delta: crdt.NewCRDT(delta), } diff --git a/internal/db/fetcher/versioned.go b/internal/db/fetcher/versioned.go index 80b71cdd88..2660664bcd 100644 --- a/internal/db/fetcher/versioned.go +++ b/internal/db/fetcher/versioned.go @@ -312,20 +312,14 @@ func (vf *VersionedFetcher) seekNext(c cid.Cid, topParent bool) error { } // only seekNext on parent if we have a HEAD link - l, ok := block.GetLinkByName(core.HEAD) - if ok { - err := vf.seekNext(l.Cid, true) + if len(block.Heads) != 0 { + err := vf.seekNext(block.Heads[0].Cid, true) if err != nil { return err } } - // loop over links and ignore head links for _, l := range block.Links { - if l.Name == core.HEAD { - continue - } - err := vf.seekNext(l.Link.Cid, false) if err != nil { return err @@ -362,12 +356,7 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error { } // handle subgraphs - // loop over links and ignore head links for _, l := range block.Links { - if l.Name == core.HEAD { - continue - } - // get node subBlock, err := vf.getDAGBlock(l.Link.Cid) if err != nil { diff --git a/internal/db/merge.go b/internal/db/merge.go index 58c89cfc4e..0f9d385e13 100644 --- a/internal/db/merge.go +++ b/internal/db/merge.go @@ -204,8 +204,8 @@ func (mp *mergeProcessor) loadComposites( // In this case, we also need to walk back the merge target's DAG until we reach a common block. if block.Delta.GetPriority() >= mt.headHeight { mp.composites.PushFront(block) - for _, prevCid := range block.GetHeadLinks() { - err := mp.loadComposites(ctx, prevCid, mt) + for _, head := range block.Heads { + err := mp.loadComposites(ctx, head.Cid, mt) if err != nil { return err } @@ -213,21 +213,19 @@ func (mp *mergeProcessor) loadComposites( } else { newMT := newMergeTarget() for _, b := range mt.heads { - for _, link := range b.Links { - if link.Name == core.HEAD { - nd, err := mp.blockLS.Load(linking.LinkContext{Ctx: ctx}, link.Link, coreblock.SchemaPrototype) - if err != nil { - return err - } - - childBlock, err := coreblock.GetFromNode(nd) - if err != nil { - return err - } - - newMT.heads[link.Cid] = childBlock - newMT.headHeight = childBlock.Delta.GetPriority() + for _, link := range b.Heads { + nd, err := mp.blockLS.Load(linking.LinkContext{Ctx: ctx}, link, coreblock.SchemaPrototype) + if err != nil { + return err } + + childBlock, err := coreblock.GetFromNode(nd) + if err != nil { + return err + } + + newMT.heads[link.Cid] = childBlock + newMT.headHeight = childBlock.Delta.GetPriority() } } return mp.loadComposites(ctx, blockCid, newMT) @@ -387,10 +385,6 @@ func (mp *mergeProcessor) processBlock( } for _, link := range dagBlock.Links { - if link.Name == core.HEAD { - continue - } - nd, err := mp.blockLS.Load(linking.LinkContext{Ctx: ctx}, link.Link, coreblock.SchemaPrototype) if err != nil { return err diff --git a/internal/merkle/clock/clock.go b/internal/merkle/clock/clock.go index b5b55e1374..e6e6dbf25f 100644 --- a/internal/merkle/clock/clock.go +++ b/internal/merkle/clock/clock.go @@ -241,15 +241,7 @@ func (mc *MerkleClock) updateHeads( ) error { priority := block.Delta.GetPriority() - // check if we have any HEAD links - hasHeads := false - for _, l := range block.Links { - if l.Name == core.HEAD { - hasHeads = true - break - } - } - if !hasHeads { // reached the bottom, at a leaf + if len(block.Heads) == 0 { // reached the bottom, at a leaf err := mc.headset.Write(ctx, blockLink.Cid, priority) if err != nil { return NewErrAddingHead(blockLink.Cid, err) diff --git a/net/sync_dag.go b/net/sync_dag.go index e9c17035bf..e9952a016f 100644 --- a/net/sync_dag.go +++ b/net/sync_dag.go @@ -72,9 +72,9 @@ func loadBlockLinks(ctx context.Context, lsys linking.LinkSystem, block *coreblo cancel() } - for _, lnk := range block.Links { + for _, lnk := range block.Links2() { wg.Add(1) - go func(lnk coreblock.DAGLink) { + go func(lnk cidlink.Link) { defer wg.Done() if ctx.Err() != nil { return