Skip to content

Commit

Permalink
PR FIXUP - Fix waitForSync test action
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSisley committed Nov 15, 2024
1 parent 01fba9c commit d1906b2
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 47 deletions.
7 changes: 6 additions & 1 deletion tests/integration/acp.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,12 @@ func addDocActorRelationshipACP(
}

if action.ExpectedError == "" && !action.ExpectedExistence {
waitForUpdateEvents(s, actionNodeID, map[string]struct{}{docID: {}})
expect := make([]map[string]struct{}, action.CollectionID+1)
expect[action.CollectionID] = map[string]struct{}{
docID: {},
}

waitForUpdateEvents(s, actionNodeID, expect)
}
}

Expand Down
104 changes: 67 additions & 37 deletions tests/integration/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ func waitForReplicatorConfigureEvent(s *state, cfg ConfigureReplicator) {
}

// all previous documents should be merged on the subscriber node
for key, val := range s.nodes[cfg.SourceNodeID].p2p.actualDocHeads {
s.nodes[cfg.TargetNodeID].p2p.expectedDocHeads[key] = val.cid
for key, val := range s.nodes[cfg.SourceNodeID].p2p.actualDAGHeads {
s.nodes[cfg.TargetNodeID].p2p.expectedDAGHeads[key] = val.cid
}

// update node connections and replicators
Expand Down Expand Up @@ -153,7 +153,7 @@ func waitForUnsubscribeToCollectionEvent(s *state, action UnsubscribeToCollectio
func waitForUpdateEvents(
s *state,
nodeID immutable.Option[int],
docIDs map[string]struct{},
docIDs []map[string]struct{},
) {
for i := 0; i < len(s.nodes); i++ {
if nodeID.HasValue() && nodeID.Value() != i {
Expand All @@ -166,8 +166,15 @@ func waitForUpdateEvents(
}

expect := make(map[string]struct{}, len(docIDs))
for k := range docIDs {
expect[k] = struct{}{}
for collectionIndex, collectionDocIDs := range docIDs {
for k := range collectionDocIDs {
expect[k] = struct{}{}

col := node.collections[collectionIndex]
if col.Description().IsBranchable {
expect[col.SchemaRoot()] = struct{}{}
}
}
}

for len(expect) > 0 {
Expand All @@ -183,16 +190,10 @@ func waitForUpdateEvents(
require.Fail(s.t, "timeout waiting for update event", "Node %d", i)
}

if evt.DocID == "" {
// Todo: This will almost certainly need to change once P2P for collection-level commits
// is enabled. See: https://github.com/sourcenetwork/defradb/issues/3212
continue
}

// make sure the event is expected
_, ok := expect[evt.DocID]
require.True(s.t, ok, "unexpected document update", "Node %d", i)
delete(expect, evt.DocID)
_, ok := expect[getUpdateEventKey(evt)]
require.True(s.t, ok, "unexpected document update", getUpdateEventKey(evt))
delete(expect, getUpdateEventKey(evt))

// we only need to update the network state if the nodes
// are configured for networking
Expand All @@ -203,7 +204,7 @@ func waitForUpdateEvents(
}
}

// waitForMergeEvents waits for all expected document heads to be merged to all nodes.
// waitForMergeEvents waits for all expected heads to be merged to all nodes.
//
// Will fail the test if an event is not received within the expected time interval to prevent tests
// from running forever.
Expand All @@ -214,11 +215,11 @@ func waitForMergeEvents(s *state, action WaitForSync) {
continue // node is closed
}

expect := node.p2p.expectedDocHeads
expect := node.p2p.expectedDAGHeads

// remove any docs that are already merged
// up to the expected document head
for key, val := range node.p2p.actualDocHeads {
// remove any heads that are already merged
// up to the expected head
for key, val := range node.p2p.actualDAGHeads {
if head, ok := expect[key]; ok && head.String() == val.cid.String() {
delete(expect, key)
}
Expand All @@ -230,13 +231,13 @@ func waitForMergeEvents(s *state, action WaitForSync) {
require.Fail(s.t, "doc index %d out of range", docIndex)
}
docID := s.docIDs[0][docIndex].String()
actual, hasActual := node.p2p.actualDocHeads[docID]
actual, hasActual := node.p2p.actualDAGHeads[docID]
if !hasActual || !actual.decrypted {
expectDecrypted[docID] = struct{}{}
}
}

// wait for all expected doc heads to be merged
// wait for all expected heads to be merged
//
// the order of merges does not matter as we only
// expect the latest head to eventually be merged
Expand All @@ -260,11 +261,11 @@ func waitForMergeEvents(s *state, action WaitForSync) {
delete(expectDecrypted, evt.Merge.DocID)
}

head, ok := expect[evt.Merge.DocID]
head, ok := expect[getMergeEventKey(evt.Merge)]
if ok && head.String() == evt.Merge.Cid.String() {
delete(expect, evt.Merge.DocID)
delete(expect, getMergeEventKey(evt.Merge))
}
node.p2p.actualDocHeads[evt.Merge.DocID] = docHeadState{cid: evt.Merge.Cid, decrypted: evt.Decrypted}
node.p2p.actualDAGHeads[getMergeEventKey(evt.Merge)] = docHeadState{cid: evt.Merge.Cid, decrypted: evt.Decrypted}
}
}
}
Expand All @@ -284,23 +285,23 @@ func updateNetworkState(s *state, nodeID int, evt event.Update) {

// update the actual document head on the node that updated it
// as the node created the document, it is already decrypted
node.p2p.actualDocHeads[evt.DocID] = docHeadState{cid: evt.Cid, decrypted: true}
node.p2p.actualDAGHeads[getUpdateEventKey(evt)] = docHeadState{cid: evt.Cid, decrypted: true}

// update the expected document heads of replicator targets
for id := range node.p2p.replicators {
// replicator target nodes push updates to source nodes
s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid
s.nodes[id].p2p.expectedDAGHeads[getUpdateEventKey(evt)] = evt.Cid
}

// update the expected document heads of connected nodes
for id := range node.p2p.connections {
// connected nodes share updates of documents they have in common
if _, ok := s.nodes[id].p2p.actualDocHeads[evt.DocID]; ok {
s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid
if _, ok := s.nodes[id].p2p.actualDAGHeads[getUpdateEventKey(evt)]; ok {
s.nodes[id].p2p.expectedDAGHeads[getUpdateEventKey(evt)] = evt.Cid
}
// peer collection subscribers receive updates from any other subscriber node
if _, ok := s.nodes[id].p2p.peerCollections[collectionID]; ok {
s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid
s.nodes[id].p2p.expectedDAGHeads[getUpdateEventKey(evt)] = evt.Cid
}
}

Expand All @@ -312,21 +313,24 @@ func updateNetworkState(s *state, nodeID int, evt event.Update) {

// getEventsForUpdateDoc returns a map of docIDs that should be
// published to the local event bus after an UpdateDoc action.
func getEventsForUpdateDoc(s *state, action UpdateDoc) map[string]struct{} {
func getEventsForUpdateDoc(s *state, action UpdateDoc) []map[string]struct{} {
docID := s.docIDs[action.CollectionID][action.DocID]

docMap := make(map[string]any)
err := json.Unmarshal([]byte(action.Doc), &docMap)
require.NoError(s.t, err)

return map[string]struct{}{
expect := make([]map[string]struct{}, action.CollectionID+1)
expect[action.CollectionID] = map[string]struct{}{
docID.String(): {},
}

return expect
}

// getEventsForCreateDoc returns a map of docIDs that should be
// published to the local event bus after a CreateDoc action.
func getEventsForCreateDoc(s *state, action CreateDoc) map[string]struct{} {
func getEventsForCreateDoc(s *state, action CreateDoc) []map[string]struct{} {
var collection client.Collection
if action.NodeID.HasValue() {
collection = s.nodes[action.NodeID.Value()].collections[action.CollectionID]
Expand All @@ -337,10 +341,11 @@ func getEventsForCreateDoc(s *state, action CreateDoc) map[string]struct{} {
docs, err := parseCreateDocs(action, collection)
require.NoError(s.t, err)

expect := make(map[string]struct{})
expect := make([]map[string]struct{}, action.CollectionID+1)
expect[action.CollectionID] = map[string]struct{}{}

for _, doc := range docs {
expect[doc.ID().String()] = struct{}{}
expect[action.CollectionID][doc.ID().String()] = struct{}{}
}

return expect
Expand All @@ -356,16 +361,41 @@ func getEventsForUpdateWithFilter(
s *state,
action UpdateWithFilter,
result *client.UpdateResult,
) map[string]struct{} {
) []map[string]struct{} {
var docPatch map[string]any
err := json.Unmarshal([]byte(action.Updater), &docPatch)
require.NoError(s.t, err)

expect := make(map[string]struct{})
expect := make([]map[string]struct{}, action.CollectionID+1)
expect[action.CollectionID] = map[string]struct{}{}

for _, docID := range result.DocIDs {
expect[docID] = struct{}{}
expect[action.CollectionID][docID] = struct{}{}
}

return expect
}

// getUpdateEventKey gets the identifier to which this event is scoped to.
//
// For example, if this is scoped to a document, the document ID will be
// returned. If it is scoped to a schema, the schema root will be returned.
func getUpdateEventKey(evt event.Update) string {
if evt.DocID == "" {
return evt.SchemaRoot
}

return evt.DocID
}

// getMergeEventKey gets the identifier to which this event is scoped to.
//
// For example, if this is scoped to a document, the document ID will be
// returned. If it is scoped to a schema, the schema root will be returned.
func getMergeEventKey(evt event.Merge) string {
if evt.DocID == "" {
return evt.SchemaRoot
}

return evt.DocID
}
20 changes: 13 additions & 7 deletions tests/integration/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,21 @@ type p2pState struct {
// The map key is the node id of the subscriber.
peerCollections map[int]struct{}

// actualDocHeads contains all document heads that exist on a node.
// actualDAGHeads contains all DAG heads that exist on a node.
//
// The map key is the doc id. The map value is the doc head.
actualDocHeads map[string]docHeadState
//
// This tracks composite commits for documents, and collection commits for
// branchable collections
actualDAGHeads map[string]docHeadState

// expectedDocHeads contains all document heads that are expected to exist on a node.
// expectedDAGHeads contains all DAG heads that are expected to exist on a node.
//
// The map key is the doc id. The map value is the doc head.
expectedDocHeads map[string]cid.Cid
// The map key is the doc id. The map value is the DAG head.
//
// This tracks composite commits for documents, and collection commits for
// branchable collections
expectedDAGHeads map[string]cid.Cid
}

// docHeadState contains the state of a document head.
Expand All @@ -68,8 +74,8 @@ func newP2PState() *p2pState {
connections: make(map[int]struct{}),
replicators: make(map[int]struct{}),
peerCollections: make(map[int]struct{}),
actualDocHeads: make(map[string]docHeadState),
expectedDocHeads: make(map[string]cid.Cid),
actualDAGHeads: make(map[string]docHeadState),
expectedDAGHeads: make(map[string]cid.Cid),
}
}

Expand Down
6 changes: 4 additions & 2 deletions tests/integration/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1404,10 +1404,12 @@ func deleteDoc(
assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised)

if action.ExpectedError == "" {
docIDs := map[string]struct{}{
expect := make([]map[string]struct{}, action.CollectionID+1)
expect[action.CollectionID] = map[string]struct{}{
docID.String(): {},
}
waitForUpdateEvents(s, action.NodeID, docIDs)

waitForUpdateEvents(s, action.NodeID, expect)
}
}

Expand Down

0 comments on commit d1906b2

Please sign in to comment.