diff --git a/internal/db/fetcher/versioned.go b/internal/db/fetcher/versioned.go index 6ce8f94ebc..933e4cf603 100644 --- a/internal/db/fetcher/versioned.go +++ b/internal/db/fetcher/versioned.go @@ -415,7 +415,7 @@ func (vf *VersionedFetcher) processBlock( vf.mCRDTs[crdtIndex] = mcrdt } - err = mcrdt.Clock().ProcessBlock(vf.ctx, block, blockLink) + err = mcrdt.Clock().ProcessBlock(vf.ctx, block, blockLink, false) return err } diff --git a/internal/db/merge.go b/internal/db/merge.go index 920ecd78cb..f476a509a7 100644 --- a/internal/db/merge.go +++ b/internal/db/merge.go @@ -274,13 +274,13 @@ func (mp *mergeProcessor) mergeComposites(ctx context.Context) error { for e := mp.composites.Front(); e != nil; e = e.Next() { block := e.Value.(*coreblock.Block) if block.IsEncrypted != nil && *block.IsEncrypted { - continue + onlyHeads = true } link, err := block.GenerateLink() if err != nil { return err } - err = mp.processBlock(ctx, block, link) + err = mp.processBlock(ctx, block, link, onlyHeads) if err != nil { return err } @@ -293,6 +293,7 @@ func (mp *mergeProcessor) processBlock( ctx context.Context, block *coreblock.Block, blockLink cidlink.Link, + onlyHeads bool, ) error { crdt, err := mp.initCRDTForType(block.Delta.GetFieldName()) if err != nil { @@ -305,7 +306,7 @@ func (mp *mergeProcessor) processBlock( return nil } - err = crdt.Clock().ProcessBlock(ctx, block, blockLink) + err = crdt.Clock().ProcessBlock(ctx, block, blockLink, onlyHeads) if err != nil { return err } @@ -325,7 +326,7 @@ func (mp *mergeProcessor) processBlock( return err } - if err := mp.processBlock(ctx, childBlock, link.Link); err != nil { + if err := mp.processBlock(ctx, childBlock, link.Link, onlyHeads); err != nil { return err } } diff --git a/internal/merkle/clock/clock.go b/internal/merkle/clock/clock.go index 62a7a145a6..f8d7449ca4 100644 --- a/internal/merkle/clock/clock.go +++ b/internal/merkle/clock/clock.go @@ -112,6 +112,7 @@ func (mc *MerkleClock) AddDelta( ctx, block, link, + false, ) if err != nil { return cidlink.Link{}, nil, err @@ -168,13 +169,24 @@ func (mc *MerkleClock) ProcessBlock( ctx context.Context, block *coreblock.Block, blockLink cidlink.Link, + onlyHeads bool, ) error { - priority := block.Delta.GetPriority() - + if !onlyHeads { err := mc.crdt.Merge(ctx, block.Delta.GetDelta()) if err != nil { return NewErrMergingDelta(blockLink.Cid, err) } + } + + return mc.updateHeads(ctx, block, blockLink) +} + +func (mc *MerkleClock) updateHeads( + ctx context.Context, + block *coreblock.Block, + blockLink cidlink.Link, +) error { + priority := block.Delta.GetPriority() // check if we have any HEAD links hasHeads := false diff --git a/internal/merkle/crdt/merklecrdt.go b/internal/merkle/crdt/merklecrdt.go index abc0ffeb51..85e42952cf 100644 --- a/internal/merkle/crdt/merklecrdt.go +++ b/internal/merkle/crdt/merklecrdt.go @@ -47,7 +47,10 @@ type MerkleClock interface { delta core.Delta, links ...coreblock.DAGLink, ) (cidlink.Link, []byte, error) - ProcessBlock(context.Context, *coreblock.Block, cidlink.Link) error + // ProcessBlock processes a block and updates the CRDT state. + // The bool argument indicates whether only heads need to be updated. It is needed in case + // merge should be skipped for example if the block is encrypted. + ProcessBlock(context.Context, *coreblock.Block, cidlink.Link, bool) error } // baseMerkleCRDT handles the MerkleCRDT overhead functions that aren't CRDT specific like the mutations and state diff --git a/tests/integration/encryption/peer_test.go b/tests/integration/encryption/peer_test.go index 6793f3991f..6d9c937278 100644 --- a/tests/integration/encryption/peer_test.go +++ b/tests/integration/encryption/peer_test.go @@ -23,14 +23,7 @@ func TestDocEncryptionPeer_IfPeerHasNoKey_ShouldNotFetch(t *testing.T) { Actions: []any{ testUtils.RandomNetworkingConfig(), testUtils.RandomNetworkingConfig(), - testUtils.SchemaUpdate{ - Schema: ` - type Users { - name: String - age: Int - } - `, - }, + updateUserCollectionSchema(), testUtils.ConnectPeers{ SourceNodeID: 1, TargetNodeID: 0, @@ -63,3 +56,92 @@ func TestDocEncryptionPeer_IfPeerHasNoKey_ShouldNotFetch(t *testing.T) { testUtils.ExecuteTestCase(t, test) } +func TestDocEncryptionPeer_UponSync_ShouldSyncEncryptedDAG(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + updateUserCollectionSchema(), + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Doc: `{ + "name": "John", + "age": 21 + }`, + IsEncrypted: true, + }, + testUtils.WaitForSync{}, + testUtils.Request{ + NodeID: immutable.Some(1), + Request: ` + query { + commits { + cid + collectionID + delta + docID + fieldId + fieldName + height + links { + cid + name + } + } + } + `, + Results: []map[string]any{ + { + "cid": "bafyreih7ry7ef26xn3lm2rhxusf2rbgyvl535tltrt6ehpwtvdnhlmptiu", + "collectionID": int64(1), + "delta": encrypt(testUtils.CBORValue(21)), + "docID": "bae-c9fb0fa4-1195-589c-aa54-e68333fb90b3", + "fieldId": "1", + "fieldName": "age", + "height": int64(1), + "links": []map[string]any{}, + }, + { + "cid": "bafyreifusejlwidaqswasct37eorazlfix6vyyn5af42pmjvktilzj5cty", + "collectionID": int64(1), + "delta": encrypt(testUtils.CBORValue("John")), + "docID": "bae-c9fb0fa4-1195-589c-aa54-e68333fb90b3", + "fieldId": "2", + "fieldName": "name", + "height": int64(1), + "links": []map[string]any{}, + }, + { + "cid": "bafyreicvxlfxeqghmc3gy56rp5rzfejnbng4nu77x5e3wjinfydl6wvycq", + "collectionID": int64(1), + "delta": nil, + "docID": "bae-c9fb0fa4-1195-589c-aa54-e68333fb90b3", + "fieldId": "C", + "fieldName": nil, + "height": int64(1), + "links": []map[string]any{ + { + "cid": "bafyreifusejlwidaqswasct37eorazlfix6vyyn5af42pmjvktilzj5cty", + "name": "name", + }, + { + "cid": "bafyreih7ry7ef26xn3lm2rhxusf2rbgyvl535tltrt6ehpwtvdnhlmptiu", + "name": "age", + }, + }, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +}