Skip to content

Commit

Permalink
WIP - Handle collection commits over P2P
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSisley committed Nov 13, 2024
1 parent dae2820 commit 87a8954
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 115 deletions.
116 changes: 62 additions & 54 deletions internal/db/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/sourcenetwork/defradb/event"
"github.com/sourcenetwork/defradb/internal/core"
coreblock "github.com/sourcenetwork/defradb/internal/core/block"
"github.com/sourcenetwork/defradb/internal/core/crdt"
"github.com/sourcenetwork/defradb/internal/db/base"
"github.com/sourcenetwork/defradb/internal/encryption"
"github.com/sourcenetwork/defradb/internal/keys"
Expand All @@ -47,18 +48,27 @@ func (db *db) executeMerge(ctx context.Context, dagMerge event.Merge) error {
return err
}

docID, err := client.NewDocIDFromString(dagMerge.DocID)
if err != nil {
return err
}
dsKey := base.MakeDataStoreKeyWithCollectionAndDocID(col.Description(), docID.String())
var mt mergeTarget
if dagMerge.DocID != "" {
docID, err := client.NewDocIDFromString(dagMerge.DocID)
if err != nil {
return err
}
dsKey := base.MakeDataStoreKeyWithCollectionAndDocID(col.Description(), docID.String()) // todo - just create a heeadstore key

mp, err := db.newMergeProcessor(txn, col, dsKey)
if err != nil {
return err
mt, err = getHeadsAsMergeTarget(ctx, txn, dsKey.WithFieldID(core.COMPOSITE_NAMESPACE).ToHeadStoreKey())
if err != nil {
return err
}
} else {
colKey := keys.NewHeadstoreColKey(col.Description().RootID)
mt, err = getHeadsAsMergeTarget(ctx, txn, colKey)
if err != nil {
return err
}
}

mt, err := getHeadsAsMergeTarget(ctx, txn, dsKey.WithFieldID(core.COMPOSITE_NAMESPACE))
mp, err := db.newMergeProcessor(txn, col)
if err != nil {
return err
}
Expand All @@ -73,9 +83,16 @@ func (db *db) executeMerge(ctx context.Context, dagMerge event.Merge) error {
return err
}

err = syncIndexedDoc(ctx, docID, col)
if err != nil {
return err
if dagMerge.DocID != "" { // todo - make sure this is called for children of collection commits! Maybe return something from mergeComposites
// or add a new func to get them from state
docID, err := client.NewDocIDFromString(dagMerge.DocID) // todo - this is wasteful, but kind of cheap
if err != nil {
return err
}
err = syncIndexedDoc(ctx, docID, col)
if err != nil {
return err
}
}

err = txn.Commit(ctx)
Expand Down Expand Up @@ -135,9 +152,8 @@ type mergeProcessor struct {
txn datastore.Txn
blockLS linking.LinkSystem
encBlockLS linking.LinkSystem
mCRDTs map[string]merklecrdt.MerkleCRDT
col *collection
dsKey keys.DataStoreKey

// composites is a list of composites that need to be merged.
composites *list.List
// missingEncryptionBlocks is a list of blocks that we failed to fetch
Expand All @@ -149,7 +165,6 @@ type mergeProcessor struct {
func (db *db) newMergeProcessor(
txn datastore.Txn,
col *collection,
dsKey keys.DataStoreKey,
) (*mergeProcessor, error) {
blockLS := cidlink.DefaultLinkSystem()
blockLS.SetReadStorage(txn.Blockstore().AsIPLDStorage())
Expand All @@ -161,9 +176,7 @@ func (db *db) newMergeProcessor(
txn: txn,
blockLS: blockLS,
encBlockLS: encBlockLS,
mCRDTs: make(map[string]merklecrdt.MerkleCRDT),
col: col,
dsKey: dsKey,
composites: list.New(),
missingEncryptionBlocks: make(map[cidlink.Link]struct{}),
availableEncryptionBlocks: make(map[cidlink.Link]*coreblock.Encryption),
Expand Down Expand Up @@ -375,7 +388,7 @@ func (mp *mergeProcessor) processBlock(
}

if canRead {
crdt, err := mp.initCRDTForType(dagBlock.Delta.GetFieldName())
crdt, err := mp.initCRDTForType(dagBlock.Delta)
if err != nil {
return err
}
Expand Down Expand Up @@ -435,47 +448,42 @@ func decryptBlock(
return newBlock, nil
}

func (mp *mergeProcessor) initCRDTForType(field string) (merklecrdt.MerkleCRDT, error) {
mcrdt, exists := mp.mCRDTs[field]
if exists {
return mcrdt, nil
}

func (mp *mergeProcessor) initCRDTForType(crdt crdt.CRDT) (merklecrdt.MerkleCRDT, error) {
schemaVersionKey := keys.CollectionSchemaVersionKey{
SchemaVersionID: mp.col.Schema().VersionID,
CollectionID: mp.col.ID(),
}

if field == "" {
mcrdt = merklecrdt.NewMerkleCompositeDAG(
switch {
case crdt.CompositeDAGDelta != nil:
dsKey := base.MakeDataStoreKeyWithCollectionAndDocID(mp.col.Description(), string(crdt.GetDocID()))
return merklecrdt.NewMerkleCompositeDAG(
mp.txn,
schemaVersionKey,
mp.dsKey.WithFieldID(core.COMPOSITE_NAMESPACE),
)
mp.mCRDTs[field] = mcrdt
return mcrdt, nil
}

fd, ok := mp.col.Definition().GetFieldByName(field)
if !ok {
// If the field is not part of the schema, we can safely ignore it.
return nil, nil
}
dsKey.WithFieldID(core.COMPOSITE_NAMESPACE),
), nil

case crdt.CollectionDelta != nil:
return merklecrdt.NewMerkleCollection(mp.txn, schemaVersionKey, keys.NewHeadstoreColKey(mp.col.Description().RootID)), nil

default:
field := crdt.GetFieldName()
dsKey := base.MakeDataStoreKeyWithCollectionAndDocID(mp.col.Description(), string(crdt.GetDocID()))
fd, ok := mp.col.Definition().GetFieldByName(field)
if !ok {
// If the field is not part of the schema, we can safely ignore it.
return nil, nil
}

mcrdt, err := merklecrdt.FieldLevelCRDTWithStore(
mp.txn,
schemaVersionKey,
fd.Typ,
fd.Kind,
mp.dsKey.WithFieldID(fd.ID.String()),
field,
)
if err != nil {
return nil, err
return merklecrdt.FieldLevelCRDTWithStore(
mp.txn,
schemaVersionKey,
fd.Typ,
fd.Kind,
dsKey.WithFieldID(fd.ID.String()),
field,
)
}

mp.mCRDTs[field] = mcrdt
return mcrdt, nil
}

func getCollectionFromRootSchema(ctx context.Context, db *db, rootSchema string) (*collection, error) {
Expand All @@ -498,8 +506,8 @@ func getCollectionFromRootSchema(ctx context.Context, db *db, rootSchema string)

// getHeadsAsMergeTarget retrieves the heads of the composite DAG for the given document
// and returns them as a merge target.
func getHeadsAsMergeTarget(ctx context.Context, txn datastore.Txn, dsKey keys.DataStoreKey) (mergeTarget, error) {
cids, err := getHeads(ctx, txn, dsKey)
func getHeadsAsMergeTarget(ctx context.Context, txn datastore.Txn, key keys.HeadstoreKey) (mergeTarget, error) {
cids, err := getHeads(ctx, txn, key)

if err != nil {
return mergeTarget{}, err
Expand All @@ -520,8 +528,8 @@ func getHeadsAsMergeTarget(ctx context.Context, txn datastore.Txn, dsKey keys.Da
}

// getHeads retrieves the heads associated with the given datastore key.
func getHeads(ctx context.Context, txn datastore.Txn, dsKey keys.DataStoreKey) ([]cid.Cid, error) {
headset := clock.NewHeadSet(txn.Headstore(), dsKey.ToHeadStoreKey())
func getHeads(ctx context.Context, txn datastore.Txn, key keys.HeadstoreKey) ([]cid.Cid, error) {
headset := clock.NewHeadSet(txn.Headstore(), key)

cids, _, err := headset.List(ctx)
if err != nil {
Expand Down
15 changes: 11 additions & 4 deletions internal/db/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
)

func (db *db) handleMessages(ctx context.Context, sub *event.Subscription) {
queue := newMergeQueue()
docIdQueue := newMergeQueue()
schemaRootQueue := newMergeQueue()
// This is used to ensure we only trigger loadAndPublishP2PCollections and loadAndPublishReplicators
// once per db instanciation.
loadOnce := sync.Once{}
Expand All @@ -37,9 +38,15 @@ func (db *db) handleMessages(ctx context.Context, sub *event.Subscription) {
switch evt := msg.Data.(type) {
case event.Merge:
go func() {
// ensure only one merge per docID
queue.add(evt.DocID)
defer queue.done(evt.DocID)
if evt.DocID == "" {
// ensure only one merge per schemaRoot
schemaRootQueue.add(evt.SchemaRoot)
defer schemaRootQueue.done(evt.SchemaRoot)
} else {
// ensure only one merge per docID
docIdQueue.add(evt.DocID)
defer docIdQueue.done(evt.DocID)
}

// retry the merge process if a conflict occurs
//
Expand Down
14 changes: 9 additions & 5 deletions net/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,11 @@ func (p *Peer) handleMessageLoop() {
}

func (p *Peer) handleLog(evt event.Update) error {
_, err := client.NewDocIDFromString(evt.DocID)
if err != nil {
return NewErrFailedToGetDocID(err)
if evt.DocID != "" {
_, err := client.NewDocIDFromString(evt.DocID)
if err != nil {
return NewErrFailedToGetDocID(err)
}
}

// push to each peer (replicator)
Expand All @@ -273,8 +275,10 @@ func (p *Peer) handleLog(evt event.Update) error {
Block: evt.Block,
}

if err := p.server.publishLog(p.ctx, evt.DocID, req); err != nil {
return NewErrPublishingToDocIDTopic(err, evt.Cid.String(), evt.DocID)
if evt.DocID != "" {
if err := p.server.publishLog(p.ctx, evt.DocID, req); err != nil {
return NewErrPublishingToDocIDTopic(err, evt.Cid.String(), evt.DocID)
}
}

if err := p.server.publishLog(p.ctx, evt.SchemaRoot, req); err != nil {
Expand Down
21 changes: 12 additions & 9 deletions net/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,12 @@ func (s *server) PushLog(ctx context.Context, req *pushLogRequest) (*pushLogRepl
if err != nil {
return nil, err
}
docID, err := client.NewDocIDFromString(req.DocID)
if err != nil {
return nil, err

if req.DocID != "" {
_, err := client.NewDocIDFromString(req.DocID)
if err != nil {
return nil, err
}
}
byPeer, err := libpeer.Decode(req.Creator)
if err != nil {
Expand All @@ -126,11 +129,11 @@ func (s *server) PushLog(ctx context.Context, req *pushLogRequest) (*pushLogRepl
log.InfoContext(ctx, "Received pushlog",
corelog.Any("PeerID", pid.String()),
corelog.Any("Creator", byPeer.String()),
corelog.Any("DocID", docID.String()))
corelog.Any("DocID", req.DocID))

log.InfoContext(ctx, "Starting DAG sync",
corelog.Any("PeerID", pid.String()),
corelog.Any("DocID", docID.String()))
corelog.Any("DocID", req.DocID))

err = syncDAG(ctx, s.peer.bserv, block)
if err != nil {
Expand All @@ -139,19 +142,19 @@ func (s *server) PushLog(ctx context.Context, req *pushLogRequest) (*pushLogRepl

log.InfoContext(ctx, "DAG sync complete",
corelog.Any("PeerID", pid.String()),
corelog.Any("DocID", docID.String()))
corelog.Any("DocID", req.DocID))

// Once processed, subscribe to the DocID topic on the pubsub network unless we already
// subscribed to the collection.
if !s.hasPubSubTopicAndSubscribed(req.SchemaRoot) {
err = s.addPubSubTopic(docID.String(), true, nil)
if !s.hasPubSubTopicAndSubscribed(req.SchemaRoot) && req.DocID != "" {
err = s.addPubSubTopic(req.DocID, true, nil)
if err != nil {
return nil, err
}
}

s.peer.bus.Publish(event.NewMessage(event.MergeName, event.Merge{
DocID: docID.String(),
DocID: req.DocID,
ByPeer: byPeer,
FromPeer: pid,
Cid: headCID,
Expand Down
48 changes: 5 additions & 43 deletions tests/integration/query/commits/branchables/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
testUtils "github.com/sourcenetwork/defradb/tests/integration"
)

// TODO: This test documents an unimplemented feature. Tracked by:
// https://github.com/sourcenetwork/defradb/issues/3212
func TestQueryCommitsBranchables_SyncsAcrossPeerConnection(t *testing.T) {
test := testUtils.TestCase{
Actions: []any{
Expand Down Expand Up @@ -50,15 +48,14 @@ func TestQueryCommitsBranchables_SyncsAcrossPeerConnection(t *testing.T) {
},
testUtils.WaitForSync{},
testUtils.Request{
NodeID: immutable.Some(0),
Request: `query {
commits {
cid
links {
commits {
cid
links {
cid
}
}
}
}`,
}`,
Results: map[string]any{
"commits": []map[string]any{
{
Expand Down Expand Up @@ -91,41 +88,6 @@ func TestQueryCommitsBranchables_SyncsAcrossPeerConnection(t *testing.T) {
},
},
},
testUtils.Request{
NodeID: immutable.Some(1),
Request: `query {
commits {
cid
links {
cid
}
}
}`,
Results: map[string]any{
"commits": []map[string]any{
// Note: The collection commit has not synced.
{
"cid": testUtils.NewUniqueCid("age"),
"links": []map[string]any{},
},
{
"cid": testUtils.NewUniqueCid("name"),
"links": []map[string]any{},
},
{
"cid": testUtils.NewUniqueCid("composite"),
"links": []map[string]any{
{
"cid": testUtils.NewUniqueCid("age"),
},
{
"cid": testUtils.NewUniqueCid("name"),
},
},
},
},
},
},
},
}

Expand Down

0 comments on commit 87a8954

Please sign in to comment.