From 7195ae759c410a4cebeb1bfec482d232ff690c6e Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Fri, 15 Nov 2024 14:52:04 -0500 Subject: [PATCH] WIP PR FIXUP - Fix waitForSync test action --- tests/integration/acp.go | 7 ++- tests/integration/events.go | 96 ++++++++++++++++++++++++------------- tests/integration/state.go | 20 +++++--- tests/integration/utils.go | 6 ++- 4 files changed, 86 insertions(+), 43 deletions(-) diff --git a/tests/integration/acp.go b/tests/integration/acp.go index 78a5a50997..7bf6e6151f 100644 --- a/tests/integration/acp.go +++ b/tests/integration/acp.go @@ -215,7 +215,12 @@ func addDocActorRelationshipACP( } if action.ExpectedError == "" && !action.ExpectedExistence { - waitForUpdateEvents(s, actionNodeID, map[string]struct{}{docID: {}}) + expect := make([]map[string]struct{}, action.CollectionID+1) + expect[action.CollectionID] = map[string]struct{}{ + docID: {}, + } + + waitForUpdateEvents(s, actionNodeID, expect) } } diff --git a/tests/integration/events.go b/tests/integration/events.go index 0e28f3e3df..25846ed76d 100644 --- a/tests/integration/events.go +++ b/tests/integration/events.go @@ -73,8 +73,8 @@ func waitForReplicatorConfigureEvent(s *state, cfg ConfigureReplicator) { } // all previous documents should be merged on the subscriber node - for key, val := range s.nodes[cfg.SourceNodeID].p2p.actualDocHeads { - s.nodes[cfg.TargetNodeID].p2p.expectedDocHeads[key] = val.cid + for key, val := range s.nodes[cfg.SourceNodeID].p2p.actualDAGHeads { + s.nodes[cfg.TargetNodeID].p2p.expectedDAGHeads[key] = val.cid } // update node connections and replicators @@ -153,7 +153,7 @@ func waitForUnsubscribeToCollectionEvent(s *state, action UnsubscribeToCollectio func waitForUpdateEvents( s *state, nodeID immutable.Option[int], - docIDs map[string]struct{}, + docIDs []map[string]struct{}, ) { for i := 0; i < len(s.nodes); i++ { if nodeID.HasValue() && nodeID.Value() != i { @@ -166,8 +166,15 @@ func waitForUpdateEvents( } expect := make(map[string]struct{}, len(docIDs)) - for k := range docIDs { - expect[k] = struct{}{} + for collectionIndex, collectionDocIDs := range docIDs { + for k := range collectionDocIDs { + expect[k] = struct{}{} + + col := node.collections[collectionIndex] + if col.Description().IsBranchable { + expect[col.SchemaRoot()] = struct{}{} + } + } } for len(expect) > 0 { @@ -183,16 +190,10 @@ func waitForUpdateEvents( require.Fail(s.t, "timeout waiting for update event", "Node %d", i) } - if evt.DocID == "" { - // Todo: This will almost certainly need to change once P2P for collection-level commits - // is enabled. See: https://github.com/sourcenetwork/defradb/issues/3212 - continue - } - // make sure the event is expected - _, ok := expect[evt.DocID] - require.True(s.t, ok, "unexpected document update", "Node %d", i) - delete(expect, evt.DocID) + _, ok := expect[getUpdateEventKey(evt)] + require.True(s.t, ok, "unexpected document update", getUpdateEventKey(evt)) + delete(expect, getUpdateEventKey(evt)) // we only need to update the network state if the nodes // are configured for networking @@ -214,11 +215,11 @@ func waitForMergeEvents(s *state, action WaitForSync) { continue // node is closed } - expect := node.p2p.expectedDocHeads + expect := node.p2p.expectedDAGHeads // remove any docs that are already merged // up to the expected document head - for key, val := range node.p2p.actualDocHeads { + for key, val := range node.p2p.actualDAGHeads { if head, ok := expect[key]; ok && head.String() == val.cid.String() { delete(expect, key) } @@ -230,7 +231,7 @@ func waitForMergeEvents(s *state, action WaitForSync) { require.Fail(s.t, "doc index %d out of range", docIndex) } docID := s.docIDs[0][docIndex].String() - actual, hasActual := node.p2p.actualDocHeads[docID] + actual, hasActual := node.p2p.actualDAGHeads[docID] if !hasActual || !actual.decrypted { expectDecrypted[docID] = struct{}{} } @@ -260,11 +261,11 @@ func waitForMergeEvents(s *state, action WaitForSync) { delete(expectDecrypted, evt.Merge.DocID) } - head, ok := expect[evt.Merge.DocID] + head, ok := expect[getMergeEventKey(evt.Merge)] if ok && head.String() == evt.Merge.Cid.String() { - delete(expect, evt.Merge.DocID) + delete(expect, getMergeEventKey(evt.Merge)) } - node.p2p.actualDocHeads[evt.Merge.DocID] = docHeadState{cid: evt.Merge.Cid, decrypted: evt.Decrypted} + node.p2p.actualDAGHeads[getMergeEventKey(evt.Merge)] = docHeadState{cid: evt.Merge.Cid, decrypted: evt.Decrypted} } } } @@ -284,23 +285,23 @@ func updateNetworkState(s *state, nodeID int, evt event.Update) { // update the actual document head on the node that updated it // as the node created the document, it is already decrypted - node.p2p.actualDocHeads[evt.DocID] = docHeadState{cid: evt.Cid, decrypted: true} + node.p2p.actualDAGHeads[getUpdateEventKey(evt)] = docHeadState{cid: evt.Cid, decrypted: true} // update the expected document heads of replicator targets for id := range node.p2p.replicators { // replicator target nodes push updates to source nodes - s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid + s.nodes[id].p2p.expectedDAGHeads[getUpdateEventKey(evt)] = evt.Cid } // update the expected document heads of connected nodes for id := range node.p2p.connections { // connected nodes share updates of documents they have in common - if _, ok := s.nodes[id].p2p.actualDocHeads[evt.DocID]; ok { - s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid + if _, ok := s.nodes[id].p2p.actualDAGHeads[getUpdateEventKey(evt)]; ok { + s.nodes[id].p2p.expectedDAGHeads[getUpdateEventKey(evt)] = evt.Cid } // peer collection subscribers receive updates from any other subscriber node if _, ok := s.nodes[id].p2p.peerCollections[collectionID]; ok { - s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid + s.nodes[id].p2p.expectedDAGHeads[getUpdateEventKey(evt)] = evt.Cid } } @@ -312,21 +313,24 @@ func updateNetworkState(s *state, nodeID int, evt event.Update) { // getEventsForUpdateDoc returns a map of docIDs that should be // published to the local event bus after an UpdateDoc action. -func getEventsForUpdateDoc(s *state, action UpdateDoc) map[string]struct{} { +func getEventsForUpdateDoc(s *state, action UpdateDoc) []map[string]struct{} { docID := s.docIDs[action.CollectionID][action.DocID] docMap := make(map[string]any) err := json.Unmarshal([]byte(action.Doc), &docMap) require.NoError(s.t, err) - return map[string]struct{}{ + expect := make([]map[string]struct{}, action.CollectionID+1) + expect[action.CollectionID] = map[string]struct{}{ docID.String(): {}, } + + return expect } // getEventsForCreateDoc returns a map of docIDs that should be // published to the local event bus after a CreateDoc action. -func getEventsForCreateDoc(s *state, action CreateDoc) map[string]struct{} { +func getEventsForCreateDoc(s *state, action CreateDoc) []map[string]struct{} { var collection client.Collection if action.NodeID.HasValue() { collection = s.nodes[action.NodeID.Value()].collections[action.CollectionID] @@ -337,10 +341,11 @@ func getEventsForCreateDoc(s *state, action CreateDoc) map[string]struct{} { docs, err := parseCreateDocs(action, collection) require.NoError(s.t, err) - expect := make(map[string]struct{}) + expect := make([]map[string]struct{}, action.CollectionID+1) + expect[action.CollectionID] = map[string]struct{}{} for _, doc := range docs { - expect[doc.ID().String()] = struct{}{} + expect[action.CollectionID][doc.ID().String()] = struct{}{} } return expect @@ -356,16 +361,41 @@ func getEventsForUpdateWithFilter( s *state, action UpdateWithFilter, result *client.UpdateResult, -) map[string]struct{} { +) []map[string]struct{} { var docPatch map[string]any err := json.Unmarshal([]byte(action.Updater), &docPatch) require.NoError(s.t, err) - expect := make(map[string]struct{}) + expect := make([]map[string]struct{}, action.CollectionID+1) + expect[action.CollectionID] = map[string]struct{}{} for _, docID := range result.DocIDs { - expect[docID] = struct{}{} + expect[action.CollectionID][docID] = struct{}{} } return expect } + +// getUpdateEventKey gets the identifier to which this event is scoped to. +// +// For example, if this is scoped to a document, the document ID will be +// returned. If it is scoped to a schema, the schema root will be returned. +func getUpdateEventKey(evt event.Update) string { + if evt.DocID == "" { + return evt.SchemaRoot + } + + return evt.DocID +} + +// getMergeEventKey gets the identifier to which this event is scoped to. +// +// For example, if this is scoped to a document, the document ID will be +// returned. If it is scoped to a schema, the schema root will be returned. +func getMergeEventKey(evt event.Merge) string { + if evt.DocID == "" { + return evt.SchemaRoot + } + + return evt.DocID +} diff --git a/tests/integration/state.go b/tests/integration/state.go index c163a2d9d3..c495f80d9e 100644 --- a/tests/integration/state.go +++ b/tests/integration/state.go @@ -42,15 +42,21 @@ type p2pState struct { // The map key is the node id of the subscriber. peerCollections map[int]struct{} - // actualDocHeads contains all document heads that exist on a node. + // actualDAGHeads contains all DAG heads that exist on a node. // // The map key is the doc id. The map value is the doc head. - actualDocHeads map[string]docHeadState + // + // This tracks composite commits for documents, and collection commits for + // branchable collections + actualDAGHeads map[string]docHeadState - // expectedDocHeads contains all document heads that are expected to exist on a node. + // expectedDAGHeads contains all DAG heads that are expected to exist on a node. // - // The map key is the doc id. The map value is the doc head. - expectedDocHeads map[string]cid.Cid + // The map key is the doc id. The map value is the DAG head. + // + // This tracks composite commits for documents, and collection commits for + // branchable collections + expectedDAGHeads map[string]cid.Cid } // docHeadState contains the state of a document head. @@ -68,8 +74,8 @@ func newP2PState() *p2pState { connections: make(map[int]struct{}), replicators: make(map[int]struct{}), peerCollections: make(map[int]struct{}), - actualDocHeads: make(map[string]docHeadState), - expectedDocHeads: make(map[string]cid.Cid), + actualDAGHeads: make(map[string]docHeadState), + expectedDAGHeads: make(map[string]cid.Cid), } } diff --git a/tests/integration/utils.go b/tests/integration/utils.go index 39c9ea9624..66f4f48fe5 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -1404,10 +1404,12 @@ func deleteDoc( assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) if action.ExpectedError == "" { - docIDs := map[string]struct{}{ + expect := make([]map[string]struct{}, action.CollectionID+1) + expect[action.CollectionID] = map[string]struct{}{ docID.String(): {}, } - waitForUpdateEvents(s, action.NodeID, docIDs) + + waitForUpdateEvents(s, action.NodeID, expect) } }