Skip to content

Commit

Permalink
WIP - Extract heads from links
Browse files Browse the repository at this point in the history
Now we no longer need to store const string values in the blocks
  • Loading branch information
AndrewSisley committed Sep 23, 2024
1 parent 15f857d commit 3c91d3a
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 74 deletions.
31 changes: 18 additions & 13 deletions internal/core/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ type Encryption struct {
type Block struct {
// Delta is the CRDT delta that is stored in the block.
Delta crdt.CRDT

Heads []cidlink.Link

// Links are the links to other blocks in the DAG.
Links []DAGLink
// Encryption contains the encryption information for the block's delta.
Expand All @@ -142,15 +145,16 @@ func (block *Block) Clone() *Block {
}
}

// GetHeadLinks returns the CIDs of the previous blocks. There can be more than 1 with multiple heads.
func (block *Block) GetHeadLinks() []cid.Cid {
var heads []cid.Cid
func (block *Block) Links2() []cidlink.Link {
result := make([]cidlink.Link, 0, len(block.Heads)+len(block.Links))

result = append(result, block.Heads...)

for _, link := range block.Links {
if link.Name == core.HEAD {
heads = append(heads, link.Cid)
}
result = append(result, link.Link)
}
return heads

return result
}

// IPLDSchemaBytes returns the IPLD schema representation for the block.
Expand All @@ -160,6 +164,7 @@ func (block *Block) IPLDSchemaBytes() []byte {
return []byte(`
type Block struct {
delta CRDT
heads [Link]
links [DAGLink]
encryption optional Link
}
Expand Down Expand Up @@ -188,13 +193,12 @@ func New(delta core.Delta, links []DAGLink, heads ...cid.Cid) *Block {
sort.Slice(heads, func(i, j int) bool {
return strings.Compare(heads[i].String(), heads[j].String()) < 0
})

headLinks := make([]cidlink.Link, len(heads))
for _, head := range heads {
blockLinks = append(
blockLinks,
DAGLink{
Name: core.HEAD,
Link: cidlink.Link{Cid: head},
},
headLinks = append(
headLinks,
cidlink.Link{Cid: head},
)
}

Expand All @@ -207,6 +211,7 @@ func New(delta core.Delta, links []DAGLink, heads ...cid.Cid) *Block {
blockLinks = append(blockLinks, links...)

return &Block{
Heads: headLinks,
Links: blockLinks,
Delta: crdt.NewCRDT(delta),
}
Expand Down
15 changes: 5 additions & 10 deletions internal/core/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/ipld/go-ipld-prime/storage/memstore"
"github.com/stretchr/testify/require"

"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/core/crdt"
)

Expand Down Expand Up @@ -75,11 +74,8 @@ func generateBlocks(lsys *linking.LinkSystem) (cidlink.Link, error) {
Data: []byte("Johny"),
},
},
Links: []DAGLink{
{
Name: core.HEAD,
Link: fieldBlockLink.(cidlink.Link),
},
Heads: []cidlink.Link{
fieldBlockLink.(cidlink.Link),
},
}
fieldUpdateBlockLink, err := lsys.Store(ipld.LinkContext{}, GetLinkPrototype(), fieldUpdateBlock.GenerateNode())
Expand All @@ -97,11 +93,10 @@ func generateBlocks(lsys *linking.LinkSystem) (cidlink.Link, error) {
Status: 1,
},
},
Heads: []cidlink.Link{
compositeBlockLink.(cidlink.Link),
},
Links: []DAGLink{
{
Name: core.HEAD,
Link: compositeBlockLink.(cidlink.Link),
},
{
Name: "name",
Link: fieldUpdateBlockLink.(cidlink.Link),
Expand Down
1 change: 0 additions & 1 deletion internal/core/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@ package core

const (
COMPOSITE_NAMESPACE = "C"
HEAD = "_head"
)
15 changes: 2 additions & 13 deletions internal/db/fetcher/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,20 +312,14 @@ func (vf *VersionedFetcher) seekNext(c cid.Cid, topParent bool) error {
}

// only seekNext on parent if we have a HEAD link
l, ok := block.GetLinkByName(core.HEAD)
if ok {
err := vf.seekNext(l.Cid, true)
if len(block.Heads) != 0 {
err := vf.seekNext(block.Heads[0].Cid, true)
if err != nil {
return err
}
}

// loop over links and ignore head links
for _, l := range block.Links {
if l.Name == core.HEAD {
continue
}

err := vf.seekNext(l.Link.Cid, false)
if err != nil {
return err
Expand Down Expand Up @@ -362,12 +356,7 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error {
}

// handle subgraphs
// loop over links and ignore head links
for _, l := range block.Links {
if l.Name == core.HEAD {
continue
}

// get node
subBlock, err := vf.getDAGBlock(l.Link.Cid)
if err != nil {
Expand Down
34 changes: 14 additions & 20 deletions internal/db/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,30 +204,28 @@ func (mp *mergeProcessor) loadComposites(
// In this case, we also need to walk back the merge target's DAG until we reach a common block.
if block.Delta.GetPriority() >= mt.headHeight {
mp.composites.PushFront(block)
for _, prevCid := range block.GetHeadLinks() {
err := mp.loadComposites(ctx, prevCid, mt)
for _, head := range block.Heads {
err := mp.loadComposites(ctx, head.Cid, mt)
if err != nil {
return err
}
}
} else {
newMT := newMergeTarget()
for _, b := range mt.heads {
for _, link := range b.Links {
if link.Name == core.HEAD {
nd, err := mp.blockLS.Load(linking.LinkContext{Ctx: ctx}, link.Link, coreblock.SchemaPrototype)
if err != nil {
return err
}

childBlock, err := coreblock.GetFromNode(nd)
if err != nil {
return err
}

newMT.heads[link.Cid] = childBlock
newMT.headHeight = childBlock.Delta.GetPriority()
for _, link := range b.Heads {
nd, err := mp.blockLS.Load(linking.LinkContext{Ctx: ctx}, link, coreblock.SchemaPrototype)
if err != nil {
return err
}

childBlock, err := coreblock.GetFromNode(nd)
if err != nil {
return err
}

newMT.heads[link.Cid] = childBlock
newMT.headHeight = childBlock.Delta.GetPriority()
}
}
return mp.loadComposites(ctx, blockCid, newMT)
Expand Down Expand Up @@ -387,10 +385,6 @@ func (mp *mergeProcessor) processBlock(
}

for _, link := range dagBlock.Links {
if link.Name == core.HEAD {
continue
}

nd, err := mp.blockLS.Load(linking.LinkContext{Ctx: ctx}, link.Link, coreblock.SchemaPrototype)
if err != nil {
return err
Expand Down
11 changes: 5 additions & 6 deletions internal/db/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/event"
"github.com/sourcenetwork/defradb/internal/core"
coreblock "github.com/sourcenetwork/defradb/internal/core/block"
"github.com/sourcenetwork/defradb/internal/core/crdt"
)
Expand Down Expand Up @@ -228,14 +227,13 @@ type compositeInfo struct {
}

func (d *dagBuilder) generateCompositeUpdate(lsys *linking.LinkSystem, fields map[string]any, from compositeInfo) (compositeInfo, error) {
links := []coreblock.DAGLink{}
heads := []cidlink.Link{}
newPriority := from.height + 1
if from.link.ByteLen() != 0 {
links = append(links, coreblock.DAGLink{
Name: core.HEAD,
Link: from.link,
})
heads = append(heads, from.link)
}

links := []coreblock.DAGLink{}
for field, val := range fields {
d.fieldsHeight[field]++
// Generate new Block and save to lsys
Expand Down Expand Up @@ -270,6 +268,7 @@ func (d *dagBuilder) generateCompositeUpdate(lsys *linking.LinkSystem, fields ma
Status: 1,
},
},
Heads: heads,
Links: links,
}

Expand Down
10 changes: 1 addition & 9 deletions internal/merkle/clock/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,15 +241,7 @@ func (mc *MerkleClock) updateHeads(
) error {
priority := block.Delta.GetPriority()

// check if we have any HEAD links
hasHeads := false
for _, l := range block.Links {
if l.Name == core.HEAD {
hasHeads = true
break
}
}
if !hasHeads { // reached the bottom, at a leaf
if len(block.Heads) == 0 { // reached the bottom, at a leaf
err := mc.headset.Write(ctx, blockLink.Cid, priority)
if err != nil {
return NewErrAddingHead(blockLink.Cid, err)
Expand Down
4 changes: 2 additions & 2 deletions net/sync_dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ func loadBlockLinks(ctx context.Context, lsys linking.LinkSystem, block *coreblo
cancel()
}

for _, lnk := range block.Links {
for _, lnk := range block.Links2() {
wg.Add(1)
go func(lnk coreblock.DAGLink) {
go func(lnk cidlink.Link) {
defer wg.Done()
if ctx.Err() != nil {
return
Expand Down

0 comments on commit 3c91d3a

Please sign in to comment.