diff --git a/tests/integration/acp.go b/tests/integration/acp.go index 78a5a50997..7bf6e6151f 100644 --- a/tests/integration/acp.go +++ b/tests/integration/acp.go @@ -215,7 +215,12 @@ func addDocActorRelationshipACP( } if action.ExpectedError == "" && !action.ExpectedExistence { - waitForUpdateEvents(s, actionNodeID, map[string]struct{}{docID: {}}) + expect := make([]map[string]struct{}, action.CollectionID+1) + expect[action.CollectionID] = map[string]struct{}{ + docID: {}, + } + + waitForUpdateEvents(s, actionNodeID, expect) } } diff --git a/tests/integration/events.go b/tests/integration/events.go index 0e28f3e3df..9fbdc5dc13 100644 --- a/tests/integration/events.go +++ b/tests/integration/events.go @@ -153,7 +153,7 @@ func waitForUnsubscribeToCollectionEvent(s *state, action UnsubscribeToCollectio func waitForUpdateEvents( s *state, nodeID immutable.Option[int], - docIDs map[string]struct{}, + docIDs []map[string]struct{}, ) { for i := 0; i < len(s.nodes); i++ { if nodeID.HasValue() && nodeID.Value() != i { @@ -166,8 +166,15 @@ func waitForUpdateEvents( } expect := make(map[string]struct{}, len(docIDs)) - for k := range docIDs { - expect[k] = struct{}{} + for collectionIndex, collectionDocIDs := range docIDs { + for k := range collectionDocIDs { + expect[k] = struct{}{} + + col := node.collections[collectionIndex] + if col.Description().IsBranchable { + expect[col.SchemaRoot()] = struct{}{} + } + } } for len(expect) > 0 { @@ -183,16 +190,10 @@ func waitForUpdateEvents( require.Fail(s.t, "timeout waiting for update event", "Node %d", i) } - if evt.DocID == "" { - // Todo: This will almost certainly need to change once P2P for collection-level commits - // is enabled. See: https://github.com/sourcenetwork/defradb/issues/3212 - continue - } - // make sure the event is expected - _, ok := expect[evt.DocID] - require.True(s.t, ok, "unexpected document update", "Node %d", i) - delete(expect, evt.DocID) + _, ok := expect[getUpdateEventKey(evt)] + require.True(s.t, ok, "unexpected document update", getUpdateEventKey(evt)) + delete(expect, getUpdateEventKey(evt)) // we only need to update the network state if the nodes // are configured for networking @@ -260,11 +261,11 @@ func waitForMergeEvents(s *state, action WaitForSync) { delete(expectDecrypted, evt.Merge.DocID) } - head, ok := expect[evt.Merge.DocID] + head, ok := expect[getMergeEventKey(evt.Merge)] if ok && head.String() == evt.Merge.Cid.String() { - delete(expect, evt.Merge.DocID) + delete(expect, getMergeEventKey(evt.Merge)) } - node.p2p.actualDocHeads[evt.Merge.DocID] = docHeadState{cid: evt.Merge.Cid, decrypted: evt.Decrypted} + node.p2p.actualDocHeads[getMergeEventKey(evt.Merge)] = docHeadState{cid: evt.Merge.Cid, decrypted: evt.Decrypted} } } } @@ -284,23 +285,23 @@ func updateNetworkState(s *state, nodeID int, evt event.Update) { // update the actual document head on the node that updated it // as the node created the document, it is already decrypted - node.p2p.actualDocHeads[evt.DocID] = docHeadState{cid: evt.Cid, decrypted: true} + node.p2p.actualDocHeads[getUpdateEventKey(evt)] = docHeadState{cid: evt.Cid, decrypted: true} // update the expected document heads of replicator targets for id := range node.p2p.replicators { // replicator target nodes push updates to source nodes - s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid + s.nodes[id].p2p.expectedDocHeads[getUpdateEventKey(evt)] = evt.Cid } // update the expected document heads of connected nodes for id := range node.p2p.connections { // connected nodes share updates of documents they have in common - if _, ok := s.nodes[id].p2p.actualDocHeads[evt.DocID]; ok { - s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid + if _, ok := s.nodes[id].p2p.actualDocHeads[getUpdateEventKey(evt)]; ok { + s.nodes[id].p2p.expectedDocHeads[getUpdateEventKey(evt)] = evt.Cid } // peer collection subscribers receive updates from any other subscriber node if _, ok := s.nodes[id].p2p.peerCollections[collectionID]; ok { - s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid + s.nodes[id].p2p.expectedDocHeads[getUpdateEventKey(evt)] = evt.Cid } } @@ -312,21 +313,24 @@ func updateNetworkState(s *state, nodeID int, evt event.Update) { // getEventsForUpdateDoc returns a map of docIDs that should be // published to the local event bus after an UpdateDoc action. -func getEventsForUpdateDoc(s *state, action UpdateDoc) map[string]struct{} { +func getEventsForUpdateDoc(s *state, action UpdateDoc) []map[string]struct{} { docID := s.docIDs[action.CollectionID][action.DocID] docMap := make(map[string]any) err := json.Unmarshal([]byte(action.Doc), &docMap) require.NoError(s.t, err) - return map[string]struct{}{ + expect := make([]map[string]struct{}, action.CollectionID+1) + expect[action.CollectionID] = map[string]struct{}{ docID.String(): {}, } + + return expect } // getEventsForCreateDoc returns a map of docIDs that should be // published to the local event bus after a CreateDoc action. -func getEventsForCreateDoc(s *state, action CreateDoc) map[string]struct{} { +func getEventsForCreateDoc(s *state, action CreateDoc) []map[string]struct{} { var collection client.Collection if action.NodeID.HasValue() { collection = s.nodes[action.NodeID.Value()].collections[action.CollectionID] @@ -337,10 +341,11 @@ func getEventsForCreateDoc(s *state, action CreateDoc) map[string]struct{} { docs, err := parseCreateDocs(action, collection) require.NoError(s.t, err) - expect := make(map[string]struct{}) + expect := make([]map[string]struct{}, action.CollectionID+1) + expect[action.CollectionID] = map[string]struct{}{} for _, doc := range docs { - expect[doc.ID().String()] = struct{}{} + expect[action.CollectionID][doc.ID().String()] = struct{}{} } return expect @@ -356,16 +361,33 @@ func getEventsForUpdateWithFilter( s *state, action UpdateWithFilter, result *client.UpdateResult, -) map[string]struct{} { +) []map[string]struct{} { var docPatch map[string]any err := json.Unmarshal([]byte(action.Updater), &docPatch) require.NoError(s.t, err) - expect := make(map[string]struct{}) + expect := make([]map[string]struct{}, action.CollectionID+1) + expect[action.CollectionID] = map[string]struct{}{} for _, docID := range result.DocIDs { - expect[docID] = struct{}{} + expect[action.CollectionID][docID] = struct{}{} } return expect } + +func getUpdateEventKey(evt event.Update) string { //todo - doc + if evt.DocID == "" { + return evt.SchemaRoot + } + + return evt.DocID +} + +func getMergeEventKey(evt event.Merge) string { //todo - doc + if evt.DocID == "" { + return evt.SchemaRoot + } + + return evt.DocID +} diff --git a/tests/integration/utils.go b/tests/integration/utils.go index 39c9ea9624..66f4f48fe5 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -1404,10 +1404,12 @@ func deleteDoc( assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) if action.ExpectedError == "" { - docIDs := map[string]struct{}{ + expect := make([]map[string]struct{}, action.CollectionID+1) + expect[action.CollectionID] = map[string]struct{}{ docID.String(): {}, } - waitForUpdateEvents(s, action.NodeID, docIDs) + + waitForUpdateEvents(s, action.NodeID, expect) } }