Skip to content

Commit

Permalink
WIP - Extract heads from links
Browse files Browse the repository at this point in the history
Now we no longer need to store const string values in the blocks
  • Loading branch information
AndrewSisley committed Sep 23, 2024
1 parent 15f857d commit 8e41b58
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 98 deletions.
35 changes: 20 additions & 15 deletions internal/core/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ type Encryption struct {
type Block struct {
// Delta is the CRDT delta that is stored in the block.
Delta crdt.CRDT

Heads []cidlink.Link

// Links are the links to other blocks in the DAG.
Links []DAGLink
// Encryption contains the encryption information for the block's delta.
Expand All @@ -137,20 +140,22 @@ func (block *Block) IsEncrypted() bool {
func (block *Block) Clone() *Block {
return &Block{
Delta: block.Delta.Clone(),
Heads: block.Heads,
Links: block.Links,
Encryption: block.Encryption,
}
}

// GetHeadLinks returns the CIDs of the previous blocks. There can be more than 1 with multiple heads.
func (block *Block) GetHeadLinks() []cid.Cid {
var heads []cid.Cid
func (block *Block) Links2() []cidlink.Link {
result := make([]cidlink.Link, 0, len(block.Heads)+len(block.Links))

result = append(result, block.Heads...)

for _, link := range block.Links {
if link.Name == core.HEAD {
heads = append(heads, link.Cid)
}
result = append(result, link.Link)
}
return heads

return result
}

// IPLDSchemaBytes returns the IPLD schema representation for the block.
Expand All @@ -160,6 +165,7 @@ func (block *Block) IPLDSchemaBytes() []byte {
return []byte(`
type Block struct {
delta CRDT
heads [Link]
links [DAGLink]
encryption optional Link
}
Expand All @@ -181,20 +187,17 @@ func (enc *Encryption) IPLDSchemaBytes() []byte {

// New creates a new block with the given delta and links.
func New(delta core.Delta, links []DAGLink, heads ...cid.Cid) *Block {
blockLinks := make([]DAGLink, 0, len(links)+len(heads))

// Sort the heads lexicographically by CID.
// We need to do this to ensure that the block is deterministic.
sort.Slice(heads, func(i, j int) bool {
return strings.Compare(heads[i].String(), heads[j].String()) < 0
})

headLinks := make([]cidlink.Link, 0, len(heads))
for _, head := range heads {
blockLinks = append(
blockLinks,
DAGLink{
Name: core.HEAD,
Link: cidlink.Link{Cid: head},
},
headLinks = append(
headLinks,
cidlink.Link{Cid: head},
)
}

Expand All @@ -204,9 +207,11 @@ func New(delta core.Delta, links []DAGLink, heads ...cid.Cid) *Block {
return strings.Compare(links[i].Cid.String(), links[j].Cid.String()) < 0
})

blockLinks := make([]DAGLink, 0, len(links))
blockLinks = append(blockLinks, links...)

return &Block{
Heads: headLinks,
Links: blockLinks,
Delta: crdt.NewCRDT(delta),
}
Expand Down
15 changes: 5 additions & 10 deletions internal/core/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/ipld/go-ipld-prime/storage/memstore"
"github.com/stretchr/testify/require"

"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/core/crdt"
)

Expand Down Expand Up @@ -75,11 +74,8 @@ func generateBlocks(lsys *linking.LinkSystem) (cidlink.Link, error) {
Data: []byte("Johny"),
},
},
Links: []DAGLink{
{
Name: core.HEAD,
Link: fieldBlockLink.(cidlink.Link),
},
Heads: []cidlink.Link{
fieldBlockLink.(cidlink.Link),
},
}
fieldUpdateBlockLink, err := lsys.Store(ipld.LinkContext{}, GetLinkPrototype(), fieldUpdateBlock.GenerateNode())
Expand All @@ -97,11 +93,10 @@ func generateBlocks(lsys *linking.LinkSystem) (cidlink.Link, error) {
Status: 1,
},
},
Heads: []cidlink.Link{
compositeBlockLink.(cidlink.Link),
},
Links: []DAGLink{
{
Name: core.HEAD,
Link: compositeBlockLink.(cidlink.Link),
},
{
Name: "name",
Link: fieldUpdateBlockLink.(cidlink.Link),
Expand Down
1 change: 0 additions & 1 deletion internal/core/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@ package core

const (
COMPOSITE_NAMESPACE = "C"
HEAD = "_head"
)
15 changes: 2 additions & 13 deletions internal/db/fetcher/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,20 +312,14 @@ func (vf *VersionedFetcher) seekNext(c cid.Cid, topParent bool) error {
}

// only seekNext on parent if we have a HEAD link
l, ok := block.GetLinkByName(core.HEAD)
if ok {
err := vf.seekNext(l.Cid, true)
if len(block.Heads) != 0 {
err := vf.seekNext(block.Heads[0].Cid, true)
if err != nil {
return err
}
}

// loop over links and ignore head links
for _, l := range block.Links {
if l.Name == core.HEAD {
continue
}

err := vf.seekNext(l.Link.Cid, false)
if err != nil {
return err
Expand Down Expand Up @@ -362,12 +356,7 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error {
}

// handle subgraphs
// loop over links and ignore head links
for _, l := range block.Links {
if l.Name == core.HEAD {
continue
}

// get node
subBlock, err := vf.getDAGBlock(l.Link.Cid)
if err != nil {
Expand Down
34 changes: 14 additions & 20 deletions internal/db/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,30 +204,28 @@ func (mp *mergeProcessor) loadComposites(
// In this case, we also need to walk back the merge target's DAG until we reach a common block.
if block.Delta.GetPriority() >= mt.headHeight {
mp.composites.PushFront(block)
for _, prevCid := range block.GetHeadLinks() {
err := mp.loadComposites(ctx, prevCid, mt)
for _, head := range block.Heads {
err := mp.loadComposites(ctx, head.Cid, mt)
if err != nil {
return err
}
}
} else {
newMT := newMergeTarget()
for _, b := range mt.heads {
for _, link := range b.Links {
if link.Name == core.HEAD {
nd, err := mp.blockLS.Load(linking.LinkContext{Ctx: ctx}, link.Link, coreblock.SchemaPrototype)
if err != nil {
return err
}

childBlock, err := coreblock.GetFromNode(nd)
if err != nil {
return err
}

newMT.heads[link.Cid] = childBlock
newMT.headHeight = childBlock.Delta.GetPriority()
for _, link := range b.Heads {
nd, err := mp.blockLS.Load(linking.LinkContext{Ctx: ctx}, link, coreblock.SchemaPrototype)
if err != nil {
return err
}

childBlock, err := coreblock.GetFromNode(nd)
if err != nil {
return err
}

newMT.heads[link.Cid] = childBlock
newMT.headHeight = childBlock.Delta.GetPriority()
}
}
return mp.loadComposites(ctx, blockCid, newMT)
Expand Down Expand Up @@ -387,10 +385,6 @@ func (mp *mergeProcessor) processBlock(
}

for _, link := range dagBlock.Links {
if link.Name == core.HEAD {
continue
}

nd, err := mp.blockLS.Load(linking.LinkContext{Ctx: ctx}, link.Link, coreblock.SchemaPrototype)
if err != nil {
return err
Expand Down
11 changes: 5 additions & 6 deletions internal/db/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/event"
"github.com/sourcenetwork/defradb/internal/core"
coreblock "github.com/sourcenetwork/defradb/internal/core/block"
"github.com/sourcenetwork/defradb/internal/core/crdt"
)
Expand Down Expand Up @@ -228,14 +227,13 @@ type compositeInfo struct {
}

func (d *dagBuilder) generateCompositeUpdate(lsys *linking.LinkSystem, fields map[string]any, from compositeInfo) (compositeInfo, error) {
links := []coreblock.DAGLink{}
heads := []cidlink.Link{}
newPriority := from.height + 1
if from.link.ByteLen() != 0 {
links = append(links, coreblock.DAGLink{
Name: core.HEAD,
Link: from.link,
})
heads = append(heads, from.link)
}

links := []coreblock.DAGLink{}
for field, val := range fields {
d.fieldsHeight[field]++
// Generate new Block and save to lsys
Expand Down Expand Up @@ -270,6 +268,7 @@ func (d *dagBuilder) generateCompositeUpdate(lsys *linking.LinkSystem, fields ma
Status: 1,
},
},
Heads: heads,
Links: links,
}

Expand Down
12 changes: 2 additions & 10 deletions internal/merkle/clock/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func encryptBlock(
return nil, err
}
clonedCRDT.SetData(bytes)
return &coreblock.Block{Delta: clonedCRDT, Links: block.Links}, nil
return &coreblock.Block{Delta: clonedCRDT, Heads: block.Heads, Links: block.Links}, nil
}

// ProcessBlock merges the delta CRDT and updates the state accordingly.
Expand All @@ -241,15 +241,7 @@ func (mc *MerkleClock) updateHeads(
) error {
priority := block.Delta.GetPriority()

// check if we have any HEAD links
hasHeads := false
for _, l := range block.Links {
if l.Name == core.HEAD {
hasHeads = true
break
}
}
if !hasHeads { // reached the bottom, at a leaf
if len(block.Heads) == 0 { // reached the bottom, at a leaf
err := mc.headset.Write(ctx, blockLink.Cid, priority)
if err != nil {
return NewErrAddingHead(blockLink.Cid, err)
Expand Down
41 changes: 20 additions & 21 deletions internal/planner/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package planner

import (
cid "github.com/ipfs/go-cid"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"

"github.com/sourcenetwork/immutable"

Expand Down Expand Up @@ -221,7 +220,7 @@ func (n *dagScanNode) Next() (bool, error) {
return false, err
}

currentValue, heads, err := n.dagBlockToNodeDoc(dagBlock)
currentValue, err := n.dagBlockToNodeDoc(dagBlock)
if err != nil {
return false, err
}
Expand All @@ -239,10 +238,10 @@ func (n *dagScanNode) Next() (bool, error) {
if !n.commitSelect.Depth.HasValue() || n.depthVisited < n.commitSelect.Depth.Value() {
// Insert the newly fetched cids into the slice of queued items, in reverse order
// so that the last new cid will be at the front of the slice
n.queuedCids = append(make([]*cid.Cid, len(heads)), n.queuedCids...)
n.queuedCids = append(make([]*cid.Cid, len(dagBlock.Heads)), n.queuedCids...)

for i, head := range heads {
n.queuedCids[len(heads)-i-1] = &head.Cid
for i, head := range dagBlock.Heads {
n.queuedCids[len(dagBlock.Heads)-i-1] = &head.Cid
}
}

Expand Down Expand Up @@ -293,11 +292,11 @@ which returns the current dag commit for the stored CRDT value.
All the dagScanNode endpoints use similar structures
*/

func (n *dagScanNode) dagBlockToNodeDoc(block *coreblock.Block) (core.Doc, []cidlink.Link, error) {
func (n *dagScanNode) dagBlockToNodeDoc(block *coreblock.Block) (core.Doc, error) {
commit := n.commitSelect.DocumentMapping.NewDoc()
link, err := block.GenerateLink()
if err != nil {
return core.Doc{}, nil, err
return core.Doc{}, err
}
n.commitSelect.DocumentMapping.SetFirstOfName(&commit, request.CidFieldName, link.String())

Expand All @@ -323,17 +322,17 @@ func (n *dagScanNode) dagBlockToNodeDoc(block *coreblock.Block) (core.Doc, []cid
},
)
if err != nil {
return core.Doc{}, nil, err
return core.Doc{}, err
}
if len(cols) == 0 {
return core.Doc{}, nil, client.NewErrCollectionNotFoundForSchemaVersion(schemaVersionId)
return core.Doc{}, client.NewErrCollectionNotFoundForSchemaVersion(schemaVersionId)
}

// Because we only care about the schema, we can safely take the first - the schema is the same
// for all in the set.
field, ok := cols[0].Definition().GetFieldByName(fName)
if !ok {
return core.Doc{}, nil, client.NewErrFieldNotExist(fName)
return core.Doc{}, client.NewErrFieldNotExist(fName)
}
fieldID = field.ID.String()
}
Expand Down Expand Up @@ -362,10 +361,10 @@ func (n *dagScanNode) dagBlockToNodeDoc(block *coreblock.Block) (core.Doc, []cid
},
)
if err != nil {
return core.Doc{}, nil, err
return core.Doc{}, err
}
if len(cols) == 0 {
return core.Doc{}, nil, client.NewErrCollectionNotFoundForSchemaVersion(schemaVersionId)
return core.Doc{}, client.NewErrCollectionNotFoundForSchemaVersion(schemaVersionId)
}

// WARNING: This will become incorrect once we allow multiple collections to share the same schema,
Expand All @@ -374,15 +373,21 @@ func (n *dagScanNode) dagBlockToNodeDoc(block *coreblock.Block) (core.Doc, []cid
n.commitSelect.DocumentMapping.SetFirstOfName(&commit,
request.CollectionIDFieldName, int64(cols[0].ID()))

heads := make([]cidlink.Link, 0)

// links
linksIndexes := n.commitSelect.DocumentMapping.IndexesByName[request.LinksFieldName]

for _, linksIndex := range linksIndexes {
links := make([]core.Doc, len(block.Links))
linksMapping := n.commitSelect.DocumentMapping.ChildMappings[linksIndex]

for i, l := range block.Heads {
link := linksMapping.NewDoc()
linksMapping.SetFirstOfName(&link, request.LinksNameFieldName, "_head")
linksMapping.SetFirstOfName(&link, request.LinksCidFieldName, l.Cid.String())

links[i] = link
}

for i, l := range block.Links {
link := linksMapping.NewDoc()
linksMapping.SetFirstOfName(&link, request.LinksNameFieldName, l.Name)
Expand All @@ -394,11 +399,5 @@ func (n *dagScanNode) dagBlockToNodeDoc(block *coreblock.Block) (core.Doc, []cid
commit.Fields[linksIndex] = links
}

for _, l := range block.Links {
if l.Name == "_head" {
heads = append(heads, l.Link)
}
}

return commit, heads, nil
return commit, nil
}
Loading

0 comments on commit 8e41b58

Please sign in to comment.