Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add tests for merge
Browse files Browse the repository at this point in the history
fredcarle committed Jun 7, 2024
1 parent d779a7f commit c5660b4
Showing 4 changed files with 327 additions and 15 deletions.
2 changes: 1 addition & 1 deletion internal/db/db_test.go
Original file line number Diff line number Diff line change
@@ -32,7 +32,7 @@ func newMemoryDB(ctx context.Context) (*db, error) {

func newDefraMemoryDB(ctx context.Context) (*db, error) {
rootstore := memory.NewDatastore(ctx)
return newDB(ctx, rootstore, acp.NoACP)
return newDB(ctx, rootstore, acp.NoACP, nil)
}

func TestNewDB(t *testing.T) {
14 changes: 7 additions & 7 deletions internal/db/merge.go
Original file line number Diff line number Diff line change
@@ -118,7 +118,7 @@ func (db *db) executeMerge(ctx context.Context, dagMerge events.DAGMerge) error
}
ctx = SetContextTxn(ctx, txn)
mp.txn = txn
mp.ls.SetReadStorage(txn.DAGstore().AsIPLDStorage())
mp.lsys.SetReadStorage(txn.DAGstore().AsIPLDStorage())
// Reset the CRDTs to avoid reusing the old transaction.
mp.mCRDTs = make(map[string]merklecrdt.MerkleCRDT)
continue
@@ -133,7 +133,7 @@ func (db *db) executeMerge(ctx context.Context, dagMerge events.DAGMerge) error

type mergeProcessor struct {
txn datastore.Txn
ls linking.LinkSystem
lsys linking.LinkSystem
mCRDTs map[string]merklecrdt.MerkleCRDT
col *collection
dsKey core.DataStoreKey
@@ -142,13 +142,13 @@ type mergeProcessor struct {

func (db *db) newMergeProcessor(
txn datastore.Txn,
ls linking.LinkSystem,
lsys linking.LinkSystem,
col *collection,
dsKey core.DataStoreKey,
) (*mergeProcessor, error) {
return &mergeProcessor{
txn: txn,
ls: ls,
lsys: lsys,
mCRDTs: make(map[string]merklecrdt.MerkleCRDT),
col: col,
dsKey: dsKey,
@@ -179,7 +179,7 @@ func (mp *mergeProcessor) loadComposites(
return nil
}

nd, err := mp.ls.Load(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: blockCid}, coreblock.SchemaPrototype)
nd, err := mp.lsys.Load(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: blockCid}, coreblock.SchemaPrototype)
if err != nil {
return err
}
@@ -207,7 +207,7 @@ func (mp *mergeProcessor) loadComposites(
for _, b := range mt.heads {
for _, link := range b.Links {
if link.Name == core.HEAD {
nd, err := mp.ls.Load(linking.LinkContext{Ctx: ctx}, link.Link, coreblock.SchemaPrototype)
nd, err := mp.lsys.Load(linking.LinkContext{Ctx: ctx}, link.Link, coreblock.SchemaPrototype)
if err != nil {
return err
}
@@ -269,7 +269,7 @@ func (mp *mergeProcessor) processBlock(
continue
}

nd, err := mp.ls.Load(linking.LinkContext{Ctx: ctx}, link.Link, coreblock.SchemaPrototype)
nd, err := mp.lsys.Load(linking.LinkContext{Ctx: ctx}, link.Link, coreblock.SchemaPrototype)
if err != nil {
return err
}
313 changes: 313 additions & 0 deletions internal/db/merge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package db

import (
"context"
"testing"

"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/stretchr/testify/require"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/events"
"github.com/sourcenetwork/defradb/internal/core"
coreblock "github.com/sourcenetwork/defradb/internal/core/block"
"github.com/sourcenetwork/defradb/internal/db/base"
"github.com/sourcenetwork/defradb/internal/merkle/clock"
)

const userSchema = `
type User {
name: String
age: Int
email: String
points: Int
}
`

func TestMerge_NoError(t *testing.T) {
// Test that a merge can be performed up to the provided CID.
ctx := context.Background()

// Setup the "local" database
localDB, err := newDefraMemoryDB(ctx)
require.NoError(t, err)
_, err = localDB.AddSchema(ctx, userSchema)
require.NoError(t, err)
localCol, err := localDB.GetCollectionByName(ctx, "User")
require.NoError(t, err)
docMap := map[string]any{
"name": "Alice",
"age": 30,
}
doc, err := client.NewDocFromMap(docMap, localCol.Definition())
require.NoError(t, err)

err = localCol.Create(ctx, doc)
require.NoError(t, err)

// Setup the "remote" database
remoteDB, err := newDefraMemoryDB(ctx)
require.NoError(t, err)
_, err = remoteDB.AddSchema(ctx, userSchema)
require.NoError(t, err)
remoteCol, err := remoteDB.GetCollectionByName(ctx, "User")
require.NoError(t, err)
doc, err = client.NewDocFromMap(docMap, localCol.Definition())
require.NoError(t, err)
err = remoteCol.Create(ctx, doc)
require.NoError(t, err)

// Add a few changes to the remote node
err = doc.Set("points", 100)
require.NoError(t, err)
err = remoteCol.Update(ctx, doc)
require.NoError(t, err)

// Sync the remote blocks to the local node
err = syncAndMerge(ctx, remoteDB, localDB, remoteCol, localCol, doc.ID().String())
require.NoError(t, err)

// verify the local node has the same data as the remote node
localDoc, err := localCol.Get(ctx, doc.ID(), false)
require.NoError(t, err)
localDocString, err := localDoc.String()
require.NoError(t, err)
remoteDoc, err := remoteCol.Get(ctx, doc.ID(), false)
require.NoError(t, err)
remoteDocString, err := remoteDoc.String()
require.NoError(t, err)
require.Equal(t, remoteDocString, localDocString)
}

func TestMerge_DelayedSync_NoError(t *testing.T) {
// Test that a merge can be performed up to the provided CID.
ctx := context.Background()

// Setup the "local" database
localDB, err := newDefraMemoryDB(ctx)
require.NoError(t, err)
_, err = localDB.AddSchema(ctx, userSchema)
require.NoError(t, err)
localCol, err := localDB.GetCollectionByName(ctx, "User")
require.NoError(t, err)
docMap := map[string]any{
"name": "Alice",
"age": 30,
}
doc, err := client.NewDocFromMap(docMap, localCol.Definition())
require.NoError(t, err)

err = localCol.Create(ctx, doc)
require.NoError(t, err)

// Setup the "remote" database
remoteDB, err := newDefraMemoryDB(ctx)
require.NoError(t, err)
_, err = remoteDB.AddSchema(ctx, userSchema)
require.NoError(t, err)
remoteCol, err := remoteDB.GetCollectionByName(ctx, "User")
require.NoError(t, err)
doc, err = client.NewDocFromMap(docMap, localCol.Definition())
require.NoError(t, err)
err = remoteCol.Create(ctx, doc)
require.NoError(t, err)

// Add a few changes to the remote node
err = doc.Set("points", 100)
require.NoError(t, err)
err = remoteCol.Update(ctx, doc)
require.NoError(t, err)

err = doc.Set("age", 31)
require.NoError(t, err)
err = remoteCol.Update(ctx, doc)
require.NoError(t, err)

err = doc.Set("email", "alice@yahoo.com")
require.NoError(t, err)
err = remoteCol.Update(ctx, doc)
require.NoError(t, err)

// Sync the remote blocks to the local node
err = syncAndMerge(ctx, remoteDB, localDB, remoteCol, localCol, doc.ID().String())
require.NoError(t, err)

// verify the local node has the same data as the remote node
localDoc, err := localCol.Get(ctx, doc.ID(), false)
require.NoError(t, err)
localDocString, err := localDoc.String()
require.NoError(t, err)
remoteDoc, err := remoteCol.Get(ctx, doc.ID(), false)
require.NoError(t, err)
remoteDocString, err := remoteDoc.String()
require.NoError(t, err)
require.Equal(t, remoteDocString, localDocString)
}

func TestMerge_DelayedSyncTwoBranches_NoError(t *testing.T) {
// Test that a merge can be performed up to the provided CID.
ctx := context.Background()

// Setup the "local" database
localDB, err := newDefraMemoryDB(ctx)
require.NoError(t, err)
_, err = localDB.AddSchema(ctx, userSchema)
require.NoError(t, err)
localCol, err := localDB.GetCollectionByName(ctx, "User")
require.NoError(t, err)
docMap := map[string]interface{}{

Check failure on line 169 in internal/db/merge_test.go

GitHub Actions / Lint GoLang job

use-any: since GO 1.18 'interface{}' can be replaced by 'any' (revive)
"name": "Alice",
"age": 30,
}
doc, err := client.NewDocFromMap(docMap, localCol.Definition())
require.NoError(t, err)

err = localCol.Create(ctx, doc)
require.NoError(t, err)

// Setup the "remote" database
remoteDB1, err := newDefraMemoryDB(ctx)
require.NoError(t, err)
_, err = remoteDB1.AddSchema(ctx, userSchema)
require.NoError(t, err)
remoteCol1, err := remoteDB1.GetCollectionByName(ctx, "User")
require.NoError(t, err)
doc, err = client.NewDocFromMap(docMap, remoteCol1.Definition())
require.NoError(t, err)
err = remoteCol1.Create(ctx, doc)
require.NoError(t, err)

// Setup the second "remote" database
remoteDB2, err := newDefraMemoryDB(ctx)
require.NoError(t, err)
_, err = remoteDB2.AddSchema(ctx, userSchema)
require.NoError(t, err)
remoteCol2, err := remoteDB2.GetCollectionByName(ctx, "User")
require.NoError(t, err)
doc2, err := client.NewDocFromMap(docMap, remoteCol2.Definition())
require.NoError(t, err)
err = remoteCol2.Create(ctx, doc2)
require.NoError(t, err)

// Add a few changes to the remote nodes creating two branches
err = doc.Set("points", 100)
require.NoError(t, err)
err = remoteCol1.Update(ctx, doc)
require.NoError(t, err)

err = doc2.Set("points", 200)
require.NoError(t, err)
err = remoteCol2.Update(ctx, doc2)
require.NoError(t, err)

err = doc.Set("age", 31)
require.NoError(t, err)
err = remoteCol1.Update(ctx, doc)
require.NoError(t, err)

err = doc2.Set("age", 32)
require.NoError(t, err)
err = remoteCol2.Update(ctx, doc2)
require.NoError(t, err)

err = doc.Set("email", "alice@yahoo.com")
require.NoError(t, err)
err = remoteCol1.Update(ctx, doc)
require.NoError(t, err)

err = doc2.Set("email", "alice-in-wonderland@yahoo.com")
require.NoError(t, err)
err = remoteCol2.Update(ctx, doc2)
require.NoError(t, err)

// Sync the remote blocks to the local node
err = syncAndMerge(ctx, remoteDB2, remoteDB1, remoteCol2, remoteCol1, doc.ID().String())
require.NoError(t, err)
err = syncAndMerge(ctx, remoteDB1, localDB, remoteCol1, localCol, doc.ID().String())
require.NoError(t, err)

// verify the local node has the same data as the remote node
localDoc, err := localCol.Get(ctx, doc.ID(), false)
require.NoError(t, err)
localDocString, err := localDoc.String()
require.NoError(t, err)
remoteDoc1, err := remoteCol1.Get(ctx, doc.ID(), false)
require.NoError(t, err)
remoteDocString1, err := remoteDoc1.String()
require.NoError(t, err)
require.Equal(t, remoteDocString1, localDocString)
}

func syncAndMerge(ctx context.Context, from, to *db, fromCol, toCol client.Collection, docID string) error {
dsKey := base.MakeDataStoreKeyWithCollectionAndDocID(fromCol.Description(), docID)
headset := clock.NewHeadSet(
from.multistore.Headstore(),
dsKey.WithFieldId(core.COMPOSITE_NAMESPACE).ToHeadStoreKey(),
)

cids, _, err := headset.List(ctx)
if err != nil {
return err
}

for _, cid := range cids {
blockBytes, err := from.multistore.DAGstore().AsIPLDStorage().Get(ctx, cid.KeyString())
if err != nil {
return err
}
block, err := coreblock.GetFromBytes(blockBytes)
if err != nil {
return err
}
err = syncDAG(ctx, from, to, block)
if err != nil {
return err
}
err = to.executeMerge(ctx, events.DAGMerge{
Cid: cid,
SchemaRoot: toCol.SchemaRoot(),
})
if err != nil {
return err
}
}
return nil
}

func syncDAG(ctx context.Context, from, to *db, block *coreblock.Block) error {
lsys := cidlink.DefaultLinkSystem()
lsys.SetWriteStorage(to.multistore.DAGstore().AsIPLDStorage())
_, err := lsys.Store(linking.LinkContext{Ctx: ctx}, coreblock.GetLinkPrototype(), block.GenerateNode())
if err != nil {
return err
}

for _, link := range block.Links {
lsys := cidlink.DefaultLinkSystem()
lsys.SetReadStorage(from.multistore.DAGstore().AsIPLDStorage())
nd, err := lsys.Load(linking.LinkContext{Ctx: ctx}, link, coreblock.SchemaPrototype)
if err != nil {
return err
}
block, err := coreblock.GetFromNode(nd)
if err != nil {
return err
}
err = syncDAG(ctx, from, to, block)
if err != nil {
return err
}
}
return nil
}
13 changes: 6 additions & 7 deletions net/node.go
Original file line number Diff line number Diff line change
@@ -80,7 +80,7 @@ func NewNode(
ctx context.Context,
db client.DB,
opts ...NodeOpt,
) (*Node, error) {
) (node *Node, err error) {
options := DefaultOptions()
for _, opt := range opts {
opt(options)
@@ -103,10 +103,14 @@ func NewNode(
fin := finalizer.NewFinalizer()

ctx, cancel := context.WithCancel(ctx)
defer func() {
if node == nil {
cancel()
}
}()

peerstore, err := pstoreds.NewPeerstore(ctx, db.Peerstore(), pstoreds.DefaultOpts())
if err != nil {
cancel()
return nil, fin.Cleanup(err)
}
fin.Add(peerstore)
@@ -115,7 +119,6 @@ func NewNode(
// generate an ephemeral private key
key, err := crypto.GenerateEd25519()
if err != nil {
cancel()
return nil, fin.Cleanup(err)
}
options.PrivateKey = key
@@ -124,7 +127,6 @@ func NewNode(
// unmarshal the private key bytes
privateKey, err := libp2pCrypto.UnmarshalEd25519PrivateKey(options.PrivateKey)
if err != nil {
cancel()
return nil, fin.Cleanup(err)
}

@@ -155,7 +157,6 @@ func NewNode(

h, err := libp2p.New(libp2pOpts...)
if err != nil {
cancel()
return nil, fin.Cleanup(err)
}
log.InfoContext(
@@ -174,7 +175,6 @@ func NewNode(
pubsub.WithFloodPublish(true),
)
if err != nil {
cancel()
return nil, fin.Cleanup(err)
}
}
@@ -188,7 +188,6 @@ func NewNode(
options.GRPCDialOptions,
)
if err != nil {
cancel()
return nil, fin.Cleanup(err)
}

0 comments on commit c5660b4

Please sign in to comment.