From 3c91d3ab1a0fc595d1a4d20d9540c7b7cc23b1c6 Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Mon, 23 Sep 2024 13:25:04 -0400 Subject: [PATCH] WIP - Extract heads from links Now we no longer need to store const string values in the blocks --- internal/core/block/block.go | 31 ++++++++++++++++------------ internal/core/block/block_test.go | 15 +++++--------- internal/core/type.go | 1 - internal/db/fetcher/versioned.go | 15 ++------------ internal/db/merge.go | 34 +++++++++++++------------------ internal/db/merge_test.go | 11 +++++----- internal/merkle/clock/clock.go | 10 +-------- net/sync_dag.go | 4 ++-- 8 files changed, 47 insertions(+), 74 deletions(-) diff --git a/internal/core/block/block.go b/internal/core/block/block.go index 1ec62fe939..2abb9f3790 100644 --- a/internal/core/block/block.go +++ b/internal/core/block/block.go @@ -121,6 +121,9 @@ type Encryption struct { type Block struct { // Delta is the CRDT delta that is stored in the block. Delta crdt.CRDT + + Heads []cidlink.Link + // Links are the links to other blocks in the DAG. Links []DAGLink // Encryption contains the encryption information for the block's delta. @@ -142,15 +145,16 @@ func (block *Block) Clone() *Block { } } -// GetHeadLinks returns the CIDs of the previous blocks. There can be more than 1 with multiple heads. -func (block *Block) GetHeadLinks() []cid.Cid { - var heads []cid.Cid +func (block *Block) Links2() []cidlink.Link { + result := make([]cidlink.Link, 0, len(block.Heads)+len(block.Links)) + + result = append(result, block.Heads...) + for _, link := range block.Links { - if link.Name == core.HEAD { - heads = append(heads, link.Cid) - } + result = append(result, link.Link) } - return heads + + return result } // IPLDSchemaBytes returns the IPLD schema representation for the block. @@ -160,6 +164,7 @@ func (block *Block) IPLDSchemaBytes() []byte { return []byte(` type Block struct { delta CRDT + heads [Link] links [DAGLink] encryption optional Link } @@ -188,13 +193,12 @@ func New(delta core.Delta, links []DAGLink, heads ...cid.Cid) *Block { sort.Slice(heads, func(i, j int) bool { return strings.Compare(heads[i].String(), heads[j].String()) < 0 }) + + headLinks := make([]cidlink.Link, len(heads)) for _, head := range heads { - blockLinks = append( - blockLinks, - DAGLink{ - Name: core.HEAD, - Link: cidlink.Link{Cid: head}, - }, + headLinks = append( + headLinks, + cidlink.Link{Cid: head}, ) } @@ -207,6 +211,7 @@ func New(delta core.Delta, links []DAGLink, heads ...cid.Cid) *Block { blockLinks = append(blockLinks, links...) return &Block{ + Heads: headLinks, Links: blockLinks, Delta: crdt.NewCRDT(delta), } diff --git a/internal/core/block/block_test.go b/internal/core/block/block_test.go index d7fe2d1bf0..c667fcc742 100644 --- a/internal/core/block/block_test.go +++ b/internal/core/block/block_test.go @@ -21,7 +21,6 @@ import ( "github.com/ipld/go-ipld-prime/storage/memstore" "github.com/stretchr/testify/require" - "github.com/sourcenetwork/defradb/internal/core" "github.com/sourcenetwork/defradb/internal/core/crdt" ) @@ -75,11 +74,8 @@ func generateBlocks(lsys *linking.LinkSystem) (cidlink.Link, error) { Data: []byte("Johny"), }, }, - Links: []DAGLink{ - { - Name: core.HEAD, - Link: fieldBlockLink.(cidlink.Link), - }, + Heads: []cidlink.Link{ + fieldBlockLink.(cidlink.Link), }, } fieldUpdateBlockLink, err := lsys.Store(ipld.LinkContext{}, GetLinkPrototype(), fieldUpdateBlock.GenerateNode()) @@ -97,11 +93,10 @@ func generateBlocks(lsys *linking.LinkSystem) (cidlink.Link, error) { Status: 1, }, }, + Heads: []cidlink.Link{ + compositeBlockLink.(cidlink.Link), + }, Links: []DAGLink{ - { - Name: core.HEAD, - Link: compositeBlockLink.(cidlink.Link), - }, { Name: "name", Link: fieldUpdateBlockLink.(cidlink.Link), diff --git a/internal/core/type.go b/internal/core/type.go index 9162e5a2bf..30c26dd831 100644 --- a/internal/core/type.go +++ b/internal/core/type.go @@ -12,5 +12,4 @@ package core const ( COMPOSITE_NAMESPACE = "C" - HEAD = "_head" ) diff --git a/internal/db/fetcher/versioned.go b/internal/db/fetcher/versioned.go index 80b71cdd88..2660664bcd 100644 --- a/internal/db/fetcher/versioned.go +++ b/internal/db/fetcher/versioned.go @@ -312,20 +312,14 @@ func (vf *VersionedFetcher) seekNext(c cid.Cid, topParent bool) error { } // only seekNext on parent if we have a HEAD link - l, ok := block.GetLinkByName(core.HEAD) - if ok { - err := vf.seekNext(l.Cid, true) + if len(block.Heads) != 0 { + err := vf.seekNext(block.Heads[0].Cid, true) if err != nil { return err } } - // loop over links and ignore head links for _, l := range block.Links { - if l.Name == core.HEAD { - continue - } - err := vf.seekNext(l.Link.Cid, false) if err != nil { return err @@ -362,12 +356,7 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error { } // handle subgraphs - // loop over links and ignore head links for _, l := range block.Links { - if l.Name == core.HEAD { - continue - } - // get node subBlock, err := vf.getDAGBlock(l.Link.Cid) if err != nil { diff --git a/internal/db/merge.go b/internal/db/merge.go index 58c89cfc4e..0f9d385e13 100644 --- a/internal/db/merge.go +++ b/internal/db/merge.go @@ -204,8 +204,8 @@ func (mp *mergeProcessor) loadComposites( // In this case, we also need to walk back the merge target's DAG until we reach a common block. if block.Delta.GetPriority() >= mt.headHeight { mp.composites.PushFront(block) - for _, prevCid := range block.GetHeadLinks() { - err := mp.loadComposites(ctx, prevCid, mt) + for _, head := range block.Heads { + err := mp.loadComposites(ctx, head.Cid, mt) if err != nil { return err } @@ -213,21 +213,19 @@ func (mp *mergeProcessor) loadComposites( } else { newMT := newMergeTarget() for _, b := range mt.heads { - for _, link := range b.Links { - if link.Name == core.HEAD { - nd, err := mp.blockLS.Load(linking.LinkContext{Ctx: ctx}, link.Link, coreblock.SchemaPrototype) - if err != nil { - return err - } - - childBlock, err := coreblock.GetFromNode(nd) - if err != nil { - return err - } - - newMT.heads[link.Cid] = childBlock - newMT.headHeight = childBlock.Delta.GetPriority() + for _, link := range b.Heads { + nd, err := mp.blockLS.Load(linking.LinkContext{Ctx: ctx}, link, coreblock.SchemaPrototype) + if err != nil { + return err } + + childBlock, err := coreblock.GetFromNode(nd) + if err != nil { + return err + } + + newMT.heads[link.Cid] = childBlock + newMT.headHeight = childBlock.Delta.GetPriority() } } return mp.loadComposites(ctx, blockCid, newMT) @@ -387,10 +385,6 @@ func (mp *mergeProcessor) processBlock( } for _, link := range dagBlock.Links { - if link.Name == core.HEAD { - continue - } - nd, err := mp.blockLS.Load(linking.LinkContext{Ctx: ctx}, link.Link, coreblock.SchemaPrototype) if err != nil { return err diff --git a/internal/db/merge_test.go b/internal/db/merge_test.go index 55cc172634..e4a06cb482 100644 --- a/internal/db/merge_test.go +++ b/internal/db/merge_test.go @@ -22,7 +22,6 @@ import ( "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/event" - "github.com/sourcenetwork/defradb/internal/core" coreblock "github.com/sourcenetwork/defradb/internal/core/block" "github.com/sourcenetwork/defradb/internal/core/crdt" ) @@ -228,14 +227,13 @@ type compositeInfo struct { } func (d *dagBuilder) generateCompositeUpdate(lsys *linking.LinkSystem, fields map[string]any, from compositeInfo) (compositeInfo, error) { - links := []coreblock.DAGLink{} + heads := []cidlink.Link{} newPriority := from.height + 1 if from.link.ByteLen() != 0 { - links = append(links, coreblock.DAGLink{ - Name: core.HEAD, - Link: from.link, - }) + heads = append(heads, from.link) } + + links := []coreblock.DAGLink{} for field, val := range fields { d.fieldsHeight[field]++ // Generate new Block and save to lsys @@ -270,6 +268,7 @@ func (d *dagBuilder) generateCompositeUpdate(lsys *linking.LinkSystem, fields ma Status: 1, }, }, + Heads: heads, Links: links, } diff --git a/internal/merkle/clock/clock.go b/internal/merkle/clock/clock.go index b5b55e1374..e6e6dbf25f 100644 --- a/internal/merkle/clock/clock.go +++ b/internal/merkle/clock/clock.go @@ -241,15 +241,7 @@ func (mc *MerkleClock) updateHeads( ) error { priority := block.Delta.GetPriority() - // check if we have any HEAD links - hasHeads := false - for _, l := range block.Links { - if l.Name == core.HEAD { - hasHeads = true - break - } - } - if !hasHeads { // reached the bottom, at a leaf + if len(block.Heads) == 0 { // reached the bottom, at a leaf err := mc.headset.Write(ctx, blockLink.Cid, priority) if err != nil { return NewErrAddingHead(blockLink.Cid, err) diff --git a/net/sync_dag.go b/net/sync_dag.go index e9c17035bf..e9952a016f 100644 --- a/net/sync_dag.go +++ b/net/sync_dag.go @@ -72,9 +72,9 @@ func loadBlockLinks(ctx context.Context, lsys linking.LinkSystem, block *coreblo cancel() } - for _, lnk := range block.Links { + for _, lnk := range block.Links2() { wg.Add(1) - go func(lnk coreblock.DAGLink) { + go func(lnk cidlink.Link) { defer wg.Done() if ctx.Err() != nil { return