Skip to content

Commit

Permalink
refactor: CRDT merge direction (#2016)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves #1066 

## Description

This PR change the CRDT merge direction. Originally, when an update
arrived over the P2P network, each block was added to the DAG store and
merged into the datastore in the order that they were received (newest
to oldest block).

With this update, we start my receiving all blocks up to the version
that we currently have (or until we reach the root) and then we apply
the merge from oldest to newest block. This will enable more CRDT types
to be added.
  • Loading branch information
fredcarle authored Nov 1, 2023
1 parent b6abbb6 commit 597d8f3
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 367 deletions.
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,11 @@ test\:coverage:
@$(MAKE) deps:lens
@$(MAKE) clean:coverage
mkdir $(COVERAGE_DIRECTORY)
ifeq ($(path),)
gotestsum --format testname -- ./... $(TEST_FLAGS) $(COVERAGE_FLAGS)
else
gotestsum --format testname -- $(path) $(TEST_FLAGS) $(COVERAGE_FLAGS)
endif
go tool covdata textfmt -i=$(COVERAGE_DIRECTORY) -o $(COVERAGE_FILE)

.PHONY: test\:coverage-func
Expand All @@ -266,8 +270,10 @@ test\:coverage-func:

.PHONY: test\:coverage-html
test\:coverage-html:
@$(MAKE) test:coverage
@$(MAKE) test:coverage path=$(path)
go tool cover -html=$(COVERAGE_FILE)
@$(MAKE) clean:coverage


.PHONY: test\:changes
test\:changes:
Expand Down
3 changes: 1 addition & 2 deletions core/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package core
import (
"context"

cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
)

Expand All @@ -24,5 +23,5 @@ type MerkleClock interface {
ctx context.Context,
delta Delta,
) (ipld.Node, error) // possibly change to AddDeltaNode?
ProcessNode(context.Context, NodeGetter, Delta, ipld.Node) ([]cid.Cid, error)
ProcessNode(context.Context, Delta, ipld.Node) error
}
14 changes: 0 additions & 14 deletions core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,20 +196,6 @@ func (c CompositeDAG) Merge(ctx context.Context, delta core.Delta) error {
}

func (c CompositeDAG) deleteWithPrefix(ctx context.Context, key core.DataStoreKey) error {
val, err := c.store.Get(ctx, key.ToDS())
if err != nil && !errors.Is(err, ds.ErrNotFound) {
return err
}
if !errors.Is(err, ds.ErrNotFound) {
err = c.store.Put(ctx, c.key.WithDeletedFlag().ToDS(), val)
if err != nil {
return err
}
err = c.store.Delete(ctx, key.ToDS())
if err != nil {
return err
}
}
q := query.Query{
Prefix: key.ToString(),
}
Expand Down
2 changes: 1 addition & 1 deletion db/fetcher/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func (vf *VersionedFetcher) processNode(
return err
}

_, err = mcrdt.Clock().ProcessNode(vf.ctx, nil, delta, nd)
err = mcrdt.Clock().ProcessNode(vf.ctx, delta, nd)
return err
}

Expand Down
23 changes: 8 additions & 15 deletions merkle/clock/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,8 @@ func (mc *MerkleClock) AddDAGNode(
}

// apply the new node and merge the delta with state
// @todo Remove NodeGetter as a parameter, and move it to a MerkleClock field
_, err = mc.ProcessNode(
err = mc.ProcessNode(
ctx,
&CrdtNodeGetter{DeltaExtractor: mc.crdt.DeltaDecode},
delta,
nd,
)
Expand All @@ -117,17 +115,16 @@ func (mc *MerkleClock) AddDAGNode(
// ProcessNode processes an already merged delta into a CRDT by adding it to the state.
func (mc *MerkleClock) ProcessNode(
ctx context.Context,
ng core.NodeGetter,
delta core.Delta,
node ipld.Node,
) ([]cid.Cid, error) {
) error {
nodeCid := node.Cid()
priority := delta.GetPriority()

log.Debug(ctx, "Running ProcessNode", logging.NewKV("CID", nodeCid))
err := mc.crdt.Merge(ctx, delta)
if err != nil {
return nil, NewErrMergingDelta(nodeCid, err)
return NewErrMergingDelta(nodeCid, err)
}

links := node.Links()
Expand All @@ -145,18 +142,16 @@ func (mc *MerkleClock) ProcessNode(
log.Debug(ctx, "No heads found")
err := mc.headset.Write(ctx, nodeCid, priority)
if err != nil {
return nil, NewErrAddingHead(nodeCid, err)
return NewErrAddingHead(nodeCid, err)
}
}

children := []cid.Cid{}

for _, l := range links {
linkCid := l.Cid
log.Debug(ctx, "Scanning for replacement heads", logging.NewKV("Child", linkCid))
isHead, err := mc.headset.IsHead(ctx, linkCid)
if err != nil {
return nil, NewErrCheckingHead(linkCid, err)
return NewErrCheckingHead(linkCid, err)
}

if isHead {
Expand All @@ -165,15 +160,15 @@ func (mc *MerkleClock) ProcessNode(
// of current branch
err = mc.headset.Replace(ctx, linkCid, nodeCid, priority)
if err != nil {
return nil, NewErrReplacingHead(linkCid, nodeCid, err)
return NewErrReplacingHead(linkCid, nodeCid, err)
}

continue
}

known, err := mc.dagstore.Has(ctx, linkCid)
if err != nil {
return nil, NewErrCouldNotFindBlock(linkCid, err)
return NewErrCouldNotFindBlock(linkCid, err)
}
if known {
// we reached a non-head node in the known tree.
Expand All @@ -192,11 +187,9 @@ func (mc *MerkleClock) ProcessNode(
}
continue
}

children = append(children, linkCid)
}

return children, nil
return nil
}

// Heads returns the current heads of the MerkleClock.
Expand Down
81 changes: 26 additions & 55 deletions net/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import (
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/logging"
)

Expand Down Expand Up @@ -50,17 +47,10 @@ type SessionDAGSyncer interface {
}

type dagJob struct {
session *sync.WaitGroup // A waitgroup to wait for all related jobs to conclude
nodeGetter ipld.NodeGetter // a node getter to use
node ipld.Node // the current ipld Node

collection client.Collection // collection our document belongs to
dsKey core.DataStoreKey // datastore key of our document
fieldName string // field of the subgraph our node belongs to

// Transaction common to a pushlog event. It is used to pass it along to processLog
// and handleChildBlocks within the dagWorker.
txn datastore.Txn
session *sync.WaitGroup // A waitgroup to wait for all related jobs to conclude
bp *blockProcessor // the block processor to use
cid cid.Cid // the cid of the block to fetch from the P2P network
isComposite bool // whether this is a composite block

// OLD FIELDS
// root cid.Cid // the root of the branch we are walking down
Expand All @@ -87,13 +77,13 @@ func (p *Peer) sendJobWorker() {
return

case newJob := <-p.sendJobs:
jobs, ok := docWorkerQueue[newJob.dsKey.DocKey]
jobs, ok := docWorkerQueue[newJob.bp.dsKey.DocKey]
if !ok {
jobs = make(chan *dagJob, numWorkers)
for i := 0; i < numWorkers; i++ {
go p.dagWorker(jobs)
}
docWorkerQueue[newJob.dsKey.DocKey] = jobs
docWorkerQueue[newJob.bp.dsKey.DocKey] = jobs
}
jobs <- newJob

Expand All @@ -113,8 +103,8 @@ func (p *Peer) dagWorker(jobs chan *dagJob) {
log.Debug(
p.ctx,
"Starting new job from DAG queue",
logging.NewKV("Datastore Key", job.dsKey),
logging.NewKV("CID", job.node.Cid()),
logging.NewKV("Datastore Key", job.bp.dsKey),
logging.NewKV("CID", job.cid),
)

select {
Expand All @@ -125,44 +115,25 @@ func (p *Peer) dagWorker(jobs chan *dagJob) {
default:
}

children, err := p.processLog(
p.ctx,
job.txn,
job.collection,
job.dsKey,
job.fieldName,
job.node,
job.nodeGetter,
true,
)
if err != nil {
log.ErrorE(
p.ctx,
"Error processing log",
err,
logging.NewKV("Datastore key", job.dsKey),
logging.NewKV("CID", job.node.Cid()),
)
job.session.Done()
continue
}

if len(children) == 0 {
job.session.Done()
continue
}

go func(j *dagJob) {
p.handleChildBlocks(
j.session,
j.txn,
j.collection,
j.dsKey,
j.fieldName,
j.node,
children,
j.nodeGetter,
)
if j.bp.getter != nil && j.cid.Defined() {
cNode, err := j.bp.getter.Get(p.ctx, j.cid)
if err != nil {
log.ErrorE(p.ctx, "Failed to get node", err, logging.NewKV("CID", j.cid))
j.session.Done()
return
}
err = j.bp.processRemoteBlock(
p.ctx,
j.session,
cNode,
j.isComposite,
)
if err != nil {
log.ErrorE(p.ctx, "Failed to process remote block", err, logging.NewKV("CID", j.cid))
}
}
p.queuedChildren.Remove(j.cid)
j.session.Done()
}(job)
}
Expand Down
Loading

0 comments on commit 597d8f3

Please sign in to comment.