From 8e41b5824960fb59fd67ddd269de0c9cdb292c43 Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Mon, 23 Sep 2024 13:25:04 -0400 Subject: [PATCH] WIP - Extract heads from links Now we no longer need to store const string values in the blocks --- internal/core/block/block.go | 35 +++++++++++++++----------- internal/core/block/block_test.go | 15 ++++------- internal/core/type.go | 1 - internal/db/fetcher/versioned.go | 15 ++--------- internal/db/merge.go | 34 +++++++++++-------------- internal/db/merge_test.go | 11 ++++----- internal/merkle/clock/clock.go | 12 ++------- internal/planner/commit.go | 41 +++++++++++++++---------------- net/sync_dag.go | 4 +-- 9 files changed, 70 insertions(+), 98 deletions(-) diff --git a/internal/core/block/block.go b/internal/core/block/block.go index 1ec62fe939..b15ff93369 100644 --- a/internal/core/block/block.go +++ b/internal/core/block/block.go @@ -121,6 +121,9 @@ type Encryption struct { type Block struct { // Delta is the CRDT delta that is stored in the block. Delta crdt.CRDT + + Heads []cidlink.Link + // Links are the links to other blocks in the DAG. Links []DAGLink // Encryption contains the encryption information for the block's delta. @@ -137,20 +140,22 @@ func (block *Block) IsEncrypted() bool { func (block *Block) Clone() *Block { return &Block{ Delta: block.Delta.Clone(), + Heads: block.Heads, Links: block.Links, Encryption: block.Encryption, } } -// GetHeadLinks returns the CIDs of the previous blocks. There can be more than 1 with multiple heads. -func (block *Block) GetHeadLinks() []cid.Cid { - var heads []cid.Cid +func (block *Block) Links2() []cidlink.Link { + result := make([]cidlink.Link, 0, len(block.Heads)+len(block.Links)) + + result = append(result, block.Heads...) + for _, link := range block.Links { - if link.Name == core.HEAD { - heads = append(heads, link.Cid) - } + result = append(result, link.Link) } - return heads + + return result } // IPLDSchemaBytes returns the IPLD schema representation for the block. @@ -160,6 +165,7 @@ func (block *Block) IPLDSchemaBytes() []byte { return []byte(` type Block struct { delta CRDT + heads [Link] links [DAGLink] encryption optional Link } @@ -181,20 +187,17 @@ func (enc *Encryption) IPLDSchemaBytes() []byte { // New creates a new block with the given delta and links. func New(delta core.Delta, links []DAGLink, heads ...cid.Cid) *Block { - blockLinks := make([]DAGLink, 0, len(links)+len(heads)) - // Sort the heads lexicographically by CID. // We need to do this to ensure that the block is deterministic. sort.Slice(heads, func(i, j int) bool { return strings.Compare(heads[i].String(), heads[j].String()) < 0 }) + + headLinks := make([]cidlink.Link, 0, len(heads)) for _, head := range heads { - blockLinks = append( - blockLinks, - DAGLink{ - Name: core.HEAD, - Link: cidlink.Link{Cid: head}, - }, + headLinks = append( + headLinks, + cidlink.Link{Cid: head}, ) } @@ -204,9 +207,11 @@ func New(delta core.Delta, links []DAGLink, heads ...cid.Cid) *Block { return strings.Compare(links[i].Cid.String(), links[j].Cid.String()) < 0 }) + blockLinks := make([]DAGLink, 0, len(links)) blockLinks = append(blockLinks, links...) return &Block{ + Heads: headLinks, Links: blockLinks, Delta: crdt.NewCRDT(delta), } diff --git a/internal/core/block/block_test.go b/internal/core/block/block_test.go index d7fe2d1bf0..c667fcc742 100644 --- a/internal/core/block/block_test.go +++ b/internal/core/block/block_test.go @@ -21,7 +21,6 @@ import ( "github.com/ipld/go-ipld-prime/storage/memstore" "github.com/stretchr/testify/require" - "github.com/sourcenetwork/defradb/internal/core" "github.com/sourcenetwork/defradb/internal/core/crdt" ) @@ -75,11 +74,8 @@ func generateBlocks(lsys *linking.LinkSystem) (cidlink.Link, error) { Data: []byte("Johny"), }, }, - Links: []DAGLink{ - { - Name: core.HEAD, - Link: fieldBlockLink.(cidlink.Link), - }, + Heads: []cidlink.Link{ + fieldBlockLink.(cidlink.Link), }, } fieldUpdateBlockLink, err := lsys.Store(ipld.LinkContext{}, GetLinkPrototype(), fieldUpdateBlock.GenerateNode()) @@ -97,11 +93,10 @@ func generateBlocks(lsys *linking.LinkSystem) (cidlink.Link, error) { Status: 1, }, }, + Heads: []cidlink.Link{ + compositeBlockLink.(cidlink.Link), + }, Links: []DAGLink{ - { - Name: core.HEAD, - Link: compositeBlockLink.(cidlink.Link), - }, { Name: "name", Link: fieldUpdateBlockLink.(cidlink.Link), diff --git a/internal/core/type.go b/internal/core/type.go index 9162e5a2bf..30c26dd831 100644 --- a/internal/core/type.go +++ b/internal/core/type.go @@ -12,5 +12,4 @@ package core const ( COMPOSITE_NAMESPACE = "C" - HEAD = "_head" ) diff --git a/internal/db/fetcher/versioned.go b/internal/db/fetcher/versioned.go index 80b71cdd88..2660664bcd 100644 --- a/internal/db/fetcher/versioned.go +++ b/internal/db/fetcher/versioned.go @@ -312,20 +312,14 @@ func (vf *VersionedFetcher) seekNext(c cid.Cid, topParent bool) error { } // only seekNext on parent if we have a HEAD link - l, ok := block.GetLinkByName(core.HEAD) - if ok { - err := vf.seekNext(l.Cid, true) + if len(block.Heads) != 0 { + err := vf.seekNext(block.Heads[0].Cid, true) if err != nil { return err } } - // loop over links and ignore head links for _, l := range block.Links { - if l.Name == core.HEAD { - continue - } - err := vf.seekNext(l.Link.Cid, false) if err != nil { return err @@ -362,12 +356,7 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error { } // handle subgraphs - // loop over links and ignore head links for _, l := range block.Links { - if l.Name == core.HEAD { - continue - } - // get node subBlock, err := vf.getDAGBlock(l.Link.Cid) if err != nil { diff --git a/internal/db/merge.go b/internal/db/merge.go index 58c89cfc4e..0f9d385e13 100644 --- a/internal/db/merge.go +++ b/internal/db/merge.go @@ -204,8 +204,8 @@ func (mp *mergeProcessor) loadComposites( // In this case, we also need to walk back the merge target's DAG until we reach a common block. if block.Delta.GetPriority() >= mt.headHeight { mp.composites.PushFront(block) - for _, prevCid := range block.GetHeadLinks() { - err := mp.loadComposites(ctx, prevCid, mt) + for _, head := range block.Heads { + err := mp.loadComposites(ctx, head.Cid, mt) if err != nil { return err } @@ -213,21 +213,19 @@ func (mp *mergeProcessor) loadComposites( } else { newMT := newMergeTarget() for _, b := range mt.heads { - for _, link := range b.Links { - if link.Name == core.HEAD { - nd, err := mp.blockLS.Load(linking.LinkContext{Ctx: ctx}, link.Link, coreblock.SchemaPrototype) - if err != nil { - return err - } - - childBlock, err := coreblock.GetFromNode(nd) - if err != nil { - return err - } - - newMT.heads[link.Cid] = childBlock - newMT.headHeight = childBlock.Delta.GetPriority() + for _, link := range b.Heads { + nd, err := mp.blockLS.Load(linking.LinkContext{Ctx: ctx}, link, coreblock.SchemaPrototype) + if err != nil { + return err } + + childBlock, err := coreblock.GetFromNode(nd) + if err != nil { + return err + } + + newMT.heads[link.Cid] = childBlock + newMT.headHeight = childBlock.Delta.GetPriority() } } return mp.loadComposites(ctx, blockCid, newMT) @@ -387,10 +385,6 @@ func (mp *mergeProcessor) processBlock( } for _, link := range dagBlock.Links { - if link.Name == core.HEAD { - continue - } - nd, err := mp.blockLS.Load(linking.LinkContext{Ctx: ctx}, link.Link, coreblock.SchemaPrototype) if err != nil { return err diff --git a/internal/db/merge_test.go b/internal/db/merge_test.go index 55cc172634..e4a06cb482 100644 --- a/internal/db/merge_test.go +++ b/internal/db/merge_test.go @@ -22,7 +22,6 @@ import ( "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/event" - "github.com/sourcenetwork/defradb/internal/core" coreblock "github.com/sourcenetwork/defradb/internal/core/block" "github.com/sourcenetwork/defradb/internal/core/crdt" ) @@ -228,14 +227,13 @@ type compositeInfo struct { } func (d *dagBuilder) generateCompositeUpdate(lsys *linking.LinkSystem, fields map[string]any, from compositeInfo) (compositeInfo, error) { - links := []coreblock.DAGLink{} + heads := []cidlink.Link{} newPriority := from.height + 1 if from.link.ByteLen() != 0 { - links = append(links, coreblock.DAGLink{ - Name: core.HEAD, - Link: from.link, - }) + heads = append(heads, from.link) } + + links := []coreblock.DAGLink{} for field, val := range fields { d.fieldsHeight[field]++ // Generate new Block and save to lsys @@ -270,6 +268,7 @@ func (d *dagBuilder) generateCompositeUpdate(lsys *linking.LinkSystem, fields ma Status: 1, }, }, + Heads: heads, Links: links, } diff --git a/internal/merkle/clock/clock.go b/internal/merkle/clock/clock.go index b5b55e1374..806eb0fb2c 100644 --- a/internal/merkle/clock/clock.go +++ b/internal/merkle/clock/clock.go @@ -217,7 +217,7 @@ func encryptBlock( return nil, err } clonedCRDT.SetData(bytes) - return &coreblock.Block{Delta: clonedCRDT, Links: block.Links}, nil + return &coreblock.Block{Delta: clonedCRDT, Heads: block.Heads, Links: block.Links}, nil } // ProcessBlock merges the delta CRDT and updates the state accordingly. @@ -241,15 +241,7 @@ func (mc *MerkleClock) updateHeads( ) error { priority := block.Delta.GetPriority() - // check if we have any HEAD links - hasHeads := false - for _, l := range block.Links { - if l.Name == core.HEAD { - hasHeads = true - break - } - } - if !hasHeads { // reached the bottom, at a leaf + if len(block.Heads) == 0 { // reached the bottom, at a leaf err := mc.headset.Write(ctx, blockLink.Cid, priority) if err != nil { return NewErrAddingHead(blockLink.Cid, err) diff --git a/internal/planner/commit.go b/internal/planner/commit.go index 76825afe15..5ccf3d796c 100644 --- a/internal/planner/commit.go +++ b/internal/planner/commit.go @@ -12,7 +12,6 @@ package planner import ( cid "github.com/ipfs/go-cid" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/sourcenetwork/immutable" @@ -221,7 +220,7 @@ func (n *dagScanNode) Next() (bool, error) { return false, err } - currentValue, heads, err := n.dagBlockToNodeDoc(dagBlock) + currentValue, err := n.dagBlockToNodeDoc(dagBlock) if err != nil { return false, err } @@ -239,10 +238,10 @@ func (n *dagScanNode) Next() (bool, error) { if !n.commitSelect.Depth.HasValue() || n.depthVisited < n.commitSelect.Depth.Value() { // Insert the newly fetched cids into the slice of queued items, in reverse order // so that the last new cid will be at the front of the slice - n.queuedCids = append(make([]*cid.Cid, len(heads)), n.queuedCids...) + n.queuedCids = append(make([]*cid.Cid, len(dagBlock.Heads)), n.queuedCids...) - for i, head := range heads { - n.queuedCids[len(heads)-i-1] = &head.Cid + for i, head := range dagBlock.Heads { + n.queuedCids[len(dagBlock.Heads)-i-1] = &head.Cid } } @@ -293,11 +292,11 @@ which returns the current dag commit for the stored CRDT value. All the dagScanNode endpoints use similar structures */ -func (n *dagScanNode) dagBlockToNodeDoc(block *coreblock.Block) (core.Doc, []cidlink.Link, error) { +func (n *dagScanNode) dagBlockToNodeDoc(block *coreblock.Block) (core.Doc, error) { commit := n.commitSelect.DocumentMapping.NewDoc() link, err := block.GenerateLink() if err != nil { - return core.Doc{}, nil, err + return core.Doc{}, err } n.commitSelect.DocumentMapping.SetFirstOfName(&commit, request.CidFieldName, link.String()) @@ -323,17 +322,17 @@ func (n *dagScanNode) dagBlockToNodeDoc(block *coreblock.Block) (core.Doc, []cid }, ) if err != nil { - return core.Doc{}, nil, err + return core.Doc{}, err } if len(cols) == 0 { - return core.Doc{}, nil, client.NewErrCollectionNotFoundForSchemaVersion(schemaVersionId) + return core.Doc{}, client.NewErrCollectionNotFoundForSchemaVersion(schemaVersionId) } // Because we only care about the schema, we can safely take the first - the schema is the same // for all in the set. field, ok := cols[0].Definition().GetFieldByName(fName) if !ok { - return core.Doc{}, nil, client.NewErrFieldNotExist(fName) + return core.Doc{}, client.NewErrFieldNotExist(fName) } fieldID = field.ID.String() } @@ -362,10 +361,10 @@ func (n *dagScanNode) dagBlockToNodeDoc(block *coreblock.Block) (core.Doc, []cid }, ) if err != nil { - return core.Doc{}, nil, err + return core.Doc{}, err } if len(cols) == 0 { - return core.Doc{}, nil, client.NewErrCollectionNotFoundForSchemaVersion(schemaVersionId) + return core.Doc{}, client.NewErrCollectionNotFoundForSchemaVersion(schemaVersionId) } // WARNING: This will become incorrect once we allow multiple collections to share the same schema, @@ -374,8 +373,6 @@ func (n *dagScanNode) dagBlockToNodeDoc(block *coreblock.Block) (core.Doc, []cid n.commitSelect.DocumentMapping.SetFirstOfName(&commit, request.CollectionIDFieldName, int64(cols[0].ID())) - heads := make([]cidlink.Link, 0) - // links linksIndexes := n.commitSelect.DocumentMapping.IndexesByName[request.LinksFieldName] @@ -383,6 +380,14 @@ func (n *dagScanNode) dagBlockToNodeDoc(block *coreblock.Block) (core.Doc, []cid links := make([]core.Doc, len(block.Links)) linksMapping := n.commitSelect.DocumentMapping.ChildMappings[linksIndex] + for i, l := range block.Heads { + link := linksMapping.NewDoc() + linksMapping.SetFirstOfName(&link, request.LinksNameFieldName, "_head") + linksMapping.SetFirstOfName(&link, request.LinksCidFieldName, l.Cid.String()) + + links[i] = link + } + for i, l := range block.Links { link := linksMapping.NewDoc() linksMapping.SetFirstOfName(&link, request.LinksNameFieldName, l.Name) @@ -394,11 +399,5 @@ func (n *dagScanNode) dagBlockToNodeDoc(block *coreblock.Block) (core.Doc, []cid commit.Fields[linksIndex] = links } - for _, l := range block.Links { - if l.Name == "_head" { - heads = append(heads, l.Link) - } - } - - return commit, heads, nil + return commit, nil } diff --git a/net/sync_dag.go b/net/sync_dag.go index e9c17035bf..e9952a016f 100644 --- a/net/sync_dag.go +++ b/net/sync_dag.go @@ -72,9 +72,9 @@ func loadBlockLinks(ctx context.Context, lsys linking.LinkSystem, block *coreblo cancel() } - for _, lnk := range block.Links { + for _, lnk := range block.Links2() { wg.Add(1) - go func(lnk coreblock.DAGLink) { + go func(lnk cidlink.Link) { defer wg.Done() if ctx.Err() != nil { return