From 87a8954e27c3f8c30d73988a6640786ebce008d5 Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Mon, 11 Nov 2024 15:21:29 -0500 Subject: [PATCH] WIP - Handle collection commits over P2P --- internal/db/merge.go | 116 ++++++++++-------- internal/db/messages.go | 15 ++- net/peer.go | 14 ++- net/server.go | 21 ++-- .../query/commits/branchables/peer_test.go | 48 +------- 5 files changed, 99 insertions(+), 115 deletions(-) diff --git a/internal/db/merge.go b/internal/db/merge.go index 47db8740b1..2ad212356a 100644 --- a/internal/db/merge.go +++ b/internal/db/merge.go @@ -28,6 +28,7 @@ import ( "github.com/sourcenetwork/defradb/event" "github.com/sourcenetwork/defradb/internal/core" coreblock "github.com/sourcenetwork/defradb/internal/core/block" + "github.com/sourcenetwork/defradb/internal/core/crdt" "github.com/sourcenetwork/defradb/internal/db/base" "github.com/sourcenetwork/defradb/internal/encryption" "github.com/sourcenetwork/defradb/internal/keys" @@ -47,18 +48,27 @@ func (db *db) executeMerge(ctx context.Context, dagMerge event.Merge) error { return err } - docID, err := client.NewDocIDFromString(dagMerge.DocID) - if err != nil { - return err - } - dsKey := base.MakeDataStoreKeyWithCollectionAndDocID(col.Description(), docID.String()) + var mt mergeTarget + if dagMerge.DocID != "" { + docID, err := client.NewDocIDFromString(dagMerge.DocID) + if err != nil { + return err + } + dsKey := base.MakeDataStoreKeyWithCollectionAndDocID(col.Description(), docID.String()) // todo - just create a heeadstore key - mp, err := db.newMergeProcessor(txn, col, dsKey) - if err != nil { - return err + mt, err = getHeadsAsMergeTarget(ctx, txn, dsKey.WithFieldID(core.COMPOSITE_NAMESPACE).ToHeadStoreKey()) + if err != nil { + return err + } + } else { + colKey := keys.NewHeadstoreColKey(col.Description().RootID) + mt, err = getHeadsAsMergeTarget(ctx, txn, colKey) + if err != nil { + return err + } } - mt, err := getHeadsAsMergeTarget(ctx, txn, dsKey.WithFieldID(core.COMPOSITE_NAMESPACE)) + mp, err := db.newMergeProcessor(txn, col) if err != nil { return err } @@ -73,9 +83,16 @@ func (db *db) executeMerge(ctx context.Context, dagMerge event.Merge) error { return err } - err = syncIndexedDoc(ctx, docID, col) - if err != nil { - return err + if dagMerge.DocID != "" { // todo - make sure this is called for children of collection commits! Maybe return something from mergeComposites + // or add a new func to get them from state + docID, err := client.NewDocIDFromString(dagMerge.DocID) // todo - this is wasteful, but kind of cheap + if err != nil { + return err + } + err = syncIndexedDoc(ctx, docID, col) + if err != nil { + return err + } } err = txn.Commit(ctx) @@ -135,9 +152,8 @@ type mergeProcessor struct { txn datastore.Txn blockLS linking.LinkSystem encBlockLS linking.LinkSystem - mCRDTs map[string]merklecrdt.MerkleCRDT col *collection - dsKey keys.DataStoreKey + // composites is a list of composites that need to be merged. composites *list.List // missingEncryptionBlocks is a list of blocks that we failed to fetch @@ -149,7 +165,6 @@ type mergeProcessor struct { func (db *db) newMergeProcessor( txn datastore.Txn, col *collection, - dsKey keys.DataStoreKey, ) (*mergeProcessor, error) { blockLS := cidlink.DefaultLinkSystem() blockLS.SetReadStorage(txn.Blockstore().AsIPLDStorage()) @@ -161,9 +176,7 @@ func (db *db) newMergeProcessor( txn: txn, blockLS: blockLS, encBlockLS: encBlockLS, - mCRDTs: make(map[string]merklecrdt.MerkleCRDT), col: col, - dsKey: dsKey, composites: list.New(), missingEncryptionBlocks: make(map[cidlink.Link]struct{}), availableEncryptionBlocks: make(map[cidlink.Link]*coreblock.Encryption), @@ -375,7 +388,7 @@ func (mp *mergeProcessor) processBlock( } if canRead { - crdt, err := mp.initCRDTForType(dagBlock.Delta.GetFieldName()) + crdt, err := mp.initCRDTForType(dagBlock.Delta) if err != nil { return err } @@ -435,47 +448,42 @@ func decryptBlock( return newBlock, nil } -func (mp *mergeProcessor) initCRDTForType(field string) (merklecrdt.MerkleCRDT, error) { - mcrdt, exists := mp.mCRDTs[field] - if exists { - return mcrdt, nil - } - +func (mp *mergeProcessor) initCRDTForType(crdt crdt.CRDT) (merklecrdt.MerkleCRDT, error) { schemaVersionKey := keys.CollectionSchemaVersionKey{ SchemaVersionID: mp.col.Schema().VersionID, CollectionID: mp.col.ID(), } - if field == "" { - mcrdt = merklecrdt.NewMerkleCompositeDAG( + switch { + case crdt.CompositeDAGDelta != nil: + dsKey := base.MakeDataStoreKeyWithCollectionAndDocID(mp.col.Description(), string(crdt.GetDocID())) + return merklecrdt.NewMerkleCompositeDAG( mp.txn, schemaVersionKey, - mp.dsKey.WithFieldID(core.COMPOSITE_NAMESPACE), - ) - mp.mCRDTs[field] = mcrdt - return mcrdt, nil - } - - fd, ok := mp.col.Definition().GetFieldByName(field) - if !ok { - // If the field is not part of the schema, we can safely ignore it. - return nil, nil - } + dsKey.WithFieldID(core.COMPOSITE_NAMESPACE), + ), nil + + case crdt.CollectionDelta != nil: + return merklecrdt.NewMerkleCollection(mp.txn, schemaVersionKey, keys.NewHeadstoreColKey(mp.col.Description().RootID)), nil + + default: + field := crdt.GetFieldName() + dsKey := base.MakeDataStoreKeyWithCollectionAndDocID(mp.col.Description(), string(crdt.GetDocID())) + fd, ok := mp.col.Definition().GetFieldByName(field) + if !ok { + // If the field is not part of the schema, we can safely ignore it. + return nil, nil + } - mcrdt, err := merklecrdt.FieldLevelCRDTWithStore( - mp.txn, - schemaVersionKey, - fd.Typ, - fd.Kind, - mp.dsKey.WithFieldID(fd.ID.String()), - field, - ) - if err != nil { - return nil, err + return merklecrdt.FieldLevelCRDTWithStore( + mp.txn, + schemaVersionKey, + fd.Typ, + fd.Kind, + dsKey.WithFieldID(fd.ID.String()), + field, + ) } - - mp.mCRDTs[field] = mcrdt - return mcrdt, nil } func getCollectionFromRootSchema(ctx context.Context, db *db, rootSchema string) (*collection, error) { @@ -498,8 +506,8 @@ func getCollectionFromRootSchema(ctx context.Context, db *db, rootSchema string) // getHeadsAsMergeTarget retrieves the heads of the composite DAG for the given document // and returns them as a merge target. -func getHeadsAsMergeTarget(ctx context.Context, txn datastore.Txn, dsKey keys.DataStoreKey) (mergeTarget, error) { - cids, err := getHeads(ctx, txn, dsKey) +func getHeadsAsMergeTarget(ctx context.Context, txn datastore.Txn, key keys.HeadstoreKey) (mergeTarget, error) { + cids, err := getHeads(ctx, txn, key) if err != nil { return mergeTarget{}, err @@ -520,8 +528,8 @@ func getHeadsAsMergeTarget(ctx context.Context, txn datastore.Txn, dsKey keys.Da } // getHeads retrieves the heads associated with the given datastore key. -func getHeads(ctx context.Context, txn datastore.Txn, dsKey keys.DataStoreKey) ([]cid.Cid, error) { - headset := clock.NewHeadSet(txn.Headstore(), dsKey.ToHeadStoreKey()) +func getHeads(ctx context.Context, txn datastore.Txn, key keys.HeadstoreKey) ([]cid.Cid, error) { + headset := clock.NewHeadSet(txn.Headstore(), key) cids, _, err := headset.List(ctx) if err != nil { diff --git a/internal/db/messages.go b/internal/db/messages.go index 51efba982e..f8da92a917 100644 --- a/internal/db/messages.go +++ b/internal/db/messages.go @@ -22,7 +22,8 @@ import ( ) func (db *db) handleMessages(ctx context.Context, sub *event.Subscription) { - queue := newMergeQueue() + docIdQueue := newMergeQueue() + schemaRootQueue := newMergeQueue() // This is used to ensure we only trigger loadAndPublishP2PCollections and loadAndPublishReplicators // once per db instanciation. loadOnce := sync.Once{} @@ -37,9 +38,15 @@ func (db *db) handleMessages(ctx context.Context, sub *event.Subscription) { switch evt := msg.Data.(type) { case event.Merge: go func() { - // ensure only one merge per docID - queue.add(evt.DocID) - defer queue.done(evt.DocID) + if evt.DocID == "" { + // ensure only one merge per schemaRoot + schemaRootQueue.add(evt.SchemaRoot) + defer schemaRootQueue.done(evt.SchemaRoot) + } else { + // ensure only one merge per docID + docIdQueue.add(evt.DocID) + defer docIdQueue.done(evt.DocID) + } // retry the merge process if a conflict occurs // diff --git a/net/peer.go b/net/peer.go index e4ebfe8573..d59d6fe150 100644 --- a/net/peer.go +++ b/net/peer.go @@ -255,9 +255,11 @@ func (p *Peer) handleMessageLoop() { } func (p *Peer) handleLog(evt event.Update) error { - _, err := client.NewDocIDFromString(evt.DocID) - if err != nil { - return NewErrFailedToGetDocID(err) + if evt.DocID != "" { + _, err := client.NewDocIDFromString(evt.DocID) + if err != nil { + return NewErrFailedToGetDocID(err) + } } // push to each peer (replicator) @@ -273,8 +275,10 @@ func (p *Peer) handleLog(evt event.Update) error { Block: evt.Block, } - if err := p.server.publishLog(p.ctx, evt.DocID, req); err != nil { - return NewErrPublishingToDocIDTopic(err, evt.Cid.String(), evt.DocID) + if evt.DocID != "" { + if err := p.server.publishLog(p.ctx, evt.DocID, req); err != nil { + return NewErrPublishingToDocIDTopic(err, evt.Cid.String(), evt.DocID) + } } if err := p.server.publishLog(p.ctx, evt.SchemaRoot, req); err != nil { diff --git a/net/server.go b/net/server.go index c83ba3f6be..252280ed3d 100644 --- a/net/server.go +++ b/net/server.go @@ -110,9 +110,12 @@ func (s *server) PushLog(ctx context.Context, req *pushLogRequest) (*pushLogRepl if err != nil { return nil, err } - docID, err := client.NewDocIDFromString(req.DocID) - if err != nil { - return nil, err + + if req.DocID != "" { + _, err := client.NewDocIDFromString(req.DocID) + if err != nil { + return nil, err + } } byPeer, err := libpeer.Decode(req.Creator) if err != nil { @@ -126,11 +129,11 @@ func (s *server) PushLog(ctx context.Context, req *pushLogRequest) (*pushLogRepl log.InfoContext(ctx, "Received pushlog", corelog.Any("PeerID", pid.String()), corelog.Any("Creator", byPeer.String()), - corelog.Any("DocID", docID.String())) + corelog.Any("DocID", req.DocID)) log.InfoContext(ctx, "Starting DAG sync", corelog.Any("PeerID", pid.String()), - corelog.Any("DocID", docID.String())) + corelog.Any("DocID", req.DocID)) err = syncDAG(ctx, s.peer.bserv, block) if err != nil { @@ -139,19 +142,19 @@ func (s *server) PushLog(ctx context.Context, req *pushLogRequest) (*pushLogRepl log.InfoContext(ctx, "DAG sync complete", corelog.Any("PeerID", pid.String()), - corelog.Any("DocID", docID.String())) + corelog.Any("DocID", req.DocID)) // Once processed, subscribe to the DocID topic on the pubsub network unless we already // subscribed to the collection. - if !s.hasPubSubTopicAndSubscribed(req.SchemaRoot) { - err = s.addPubSubTopic(docID.String(), true, nil) + if !s.hasPubSubTopicAndSubscribed(req.SchemaRoot) && req.DocID != "" { + err = s.addPubSubTopic(req.DocID, true, nil) if err != nil { return nil, err } } s.peer.bus.Publish(event.NewMessage(event.MergeName, event.Merge{ - DocID: docID.String(), + DocID: req.DocID, ByPeer: byPeer, FromPeer: pid, Cid: headCID, diff --git a/tests/integration/query/commits/branchables/peer_test.go b/tests/integration/query/commits/branchables/peer_test.go index 81ff77a240..c98dd04173 100644 --- a/tests/integration/query/commits/branchables/peer_test.go +++ b/tests/integration/query/commits/branchables/peer_test.go @@ -18,8 +18,6 @@ import ( testUtils "github.com/sourcenetwork/defradb/tests/integration" ) -// TODO: This test documents an unimplemented feature. Tracked by: -// https://github.com/sourcenetwork/defradb/issues/3212 func TestQueryCommitsBranchables_SyncsAcrossPeerConnection(t *testing.T) { test := testUtils.TestCase{ Actions: []any{ @@ -50,15 +48,14 @@ func TestQueryCommitsBranchables_SyncsAcrossPeerConnection(t *testing.T) { }, testUtils.WaitForSync{}, testUtils.Request{ - NodeID: immutable.Some(0), Request: `query { - commits { - cid - links { + commits { cid + links { + cid + } } - } - }`, + }`, Results: map[string]any{ "commits": []map[string]any{ { @@ -91,41 +88,6 @@ func TestQueryCommitsBranchables_SyncsAcrossPeerConnection(t *testing.T) { }, }, }, - testUtils.Request{ - NodeID: immutable.Some(1), - Request: `query { - commits { - cid - links { - cid - } - } - }`, - Results: map[string]any{ - "commits": []map[string]any{ - // Note: The collection commit has not synced. - { - "cid": testUtils.NewUniqueCid("age"), - "links": []map[string]any{}, - }, - { - "cid": testUtils.NewUniqueCid("name"), - "links": []map[string]any{}, - }, - { - "cid": testUtils.NewUniqueCid("composite"), - "links": []map[string]any{ - { - "cid": testUtils.NewUniqueCid("age"), - }, - { - "cid": testUtils.NewUniqueCid("name"), - }, - }, - }, - }, - }, - }, }, }