From e25b68ad87cccc2245d906f3f616af13bb32d6e0 Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Fri, 21 Jun 2024 12:04:13 -0700 Subject: [PATCH 01/14] refactor network test actions --- tests/integration/p2p.go | 331 ++++++++++++------------------------ tests/integration/state.go | 48 +++++- tests/integration/utils2.go | 75 ++++---- 3 files changed, 191 insertions(+), 263 deletions(-) diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go index d990a3d322..0a6c6901e0 100644 --- a/tests/integration/p2p.go +++ b/tests/integration/p2p.go @@ -11,6 +11,7 @@ package tests import ( + "context" "time" "github.com/sourcenetwork/defradb/client" @@ -19,6 +20,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcenetwork/corelog" + "github.com/sourcenetwork/immutable" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -158,119 +160,13 @@ func connectPeers( log.InfoContext(s.ctx, "Bootstrapping with peers", corelog.Any("Addresses", addrs)) sourceNode.Bootstrap(addrs) + s.nodeConnections[cfg.SourceNodeID][cfg.TargetNodeID] = struct{}{} + s.nodeConnections[cfg.TargetNodeID][cfg.SourceNodeID] = struct{}{} + // Bootstrap triggers a bunch of async stuff for which we have no good way of waiting on. It must be // allowed to complete before documentation begins or it will not even try and sync it. So for now, we // sleep a little. time.Sleep(100 * time.Millisecond) - setupPeerWaitSync(s, 0, cfg) -} - -func setupPeerWaitSync( - s *state, - startIndex int, - cfg ConnectPeers, -) { - sourceToTargetEvents := []int{0} - targetToSourceEvents := []int{0} - - nodeCollections := map[int][]int{} - waitIndex := 0 - for i := startIndex; i < len(s.testCase.Actions); i++ { - switch action := s.testCase.Actions[i].(type) { - case SubscribeToCollection: - if action.ExpectedError != "" { - // If the subscription action is expected to error, then we should do nothing here. - continue - } - // This is order dependent, items should be added in the same action-loop that reads them - // as 'stuff' done before collection subscription should not be synced. - nodeCollections[action.NodeID] = append(nodeCollections[action.NodeID], action.CollectionIDs...) - - case UnsubscribeToCollection: - if action.ExpectedError != "" { - // If the unsubscribe action is expected to error, then we should do nothing here. - continue - } - - // This is order dependent, items should be added in the same action-loop that reads them - // as 'stuff' done before collection subscription should not be synced. - existingCollectionIndexes := nodeCollections[action.NodeID] - for _, collectionIndex := range action.CollectionIDs { - for i, existingCollectionIndex := range existingCollectionIndexes { - if collectionIndex == existingCollectionIndex { - // Remove the matching collection index from the set: - existingCollectionIndexes = append(existingCollectionIndexes[:i], existingCollectionIndexes[i+1:]...) - } - } - } - nodeCollections[action.NodeID] = existingCollectionIndexes - - case CreateDoc: - sourceCollectionSubscribed := collectionSubscribedTo(nodeCollections, cfg.SourceNodeID, action.CollectionID) - targetCollectionSubscribed := collectionSubscribedTo(nodeCollections, cfg.TargetNodeID, action.CollectionID) - - // Peers sync trigger sync events for documents that exist prior to configuration, even if they already - // exist at the destination, so we need to wait for documents created on all nodes, as well as those - // created on the target. - if (!action.NodeID.HasValue() || - action.NodeID.Value() == cfg.TargetNodeID) && - sourceCollectionSubscribed { - targetToSourceEvents[waitIndex] += 1 - } - - // Peers sync trigger sync events for documents that exist prior to configuration, even if they already - // exist at the destination, so we need to wait for documents created on all nodes, as well as those - // created on the source. - if (!action.NodeID.HasValue() || - action.NodeID.Value() == cfg.SourceNodeID) && - targetCollectionSubscribed { - sourceToTargetEvents[waitIndex] += 1 - } - - case DeleteDoc: - // Updates to existing docs should always sync (no-sub required) - if !action.DontSync && action.NodeID.HasValue() && action.NodeID.Value() == cfg.TargetNodeID { - targetToSourceEvents[waitIndex] += 1 - } - if !action.DontSync && action.NodeID.HasValue() && action.NodeID.Value() == cfg.SourceNodeID { - sourceToTargetEvents[waitIndex] += 1 - } - - case UpdateDoc: - // Updates to existing docs should always sync (no-sub required) - if !action.DontSync && action.NodeID.HasValue() && action.NodeID.Value() == cfg.TargetNodeID { - targetToSourceEvents[waitIndex] += 1 - } - if !action.DontSync && action.NodeID.HasValue() && action.NodeID.Value() == cfg.SourceNodeID { - sourceToTargetEvents[waitIndex] += 1 - } - - case WaitForSync: - waitIndex += 1 - targetToSourceEvents = append(targetToSourceEvents, 0) - sourceToTargetEvents = append(sourceToTargetEvents, 0) - } - } - - nodeSynced := make(chan struct{}) - go waitForMerge(s, cfg.SourceNodeID, cfg.TargetNodeID, sourceToTargetEvents, targetToSourceEvents, nodeSynced) - s.syncChans = append(s.syncChans, nodeSynced) -} - -// collectionSubscribedTo returns true if the collection on the given node -// has been subscribed to. -func collectionSubscribedTo( - nodeCollections map[int][]int, - nodeID int, - collectionID int, -) bool { - targetSubscriptionCollections := nodeCollections[nodeID] - for _, collectionId := range targetSubscriptionCollections { - if collectionId == collectionID { - return true - } - } - return false } // configureReplicator configures a replicator relationship between two existing, started, nodes. @@ -294,8 +190,14 @@ func configureReplicator( expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, cfg.ExpectedError) assertExpectedErrorRaised(s.t, s.testCase.Description, cfg.ExpectedError, expectedErrorRaised) + if err == nil { - setupReplicatorWaitSync(s, 0, cfg) + // all previous documents should be merged on the subscriber node + for key, val := range s.actualDocHeads[cfg.SourceNodeID] { + s.expectedDocHeads[cfg.TargetNodeID][key] = val + } + s.nodeReplicatorTargets[cfg.TargetNodeID][cfg.SourceNodeID] = struct{}{} + s.nodeReplicatorSources[cfg.SourceNodeID][cfg.TargetNodeID] = struct{}{} } } @@ -310,64 +212,8 @@ func deleteReplicator( Info: targetNode.PeerInfo(), }) require.NoError(s.t, err) -} - -func setupReplicatorWaitSync( - s *state, - startIndex int, - cfg ConfigureReplicator, -) { - sourceToTargetEvents := []int{0} - targetToSourceEvents := []int{0} - - docIDsSyncedToSource := map[int]struct{}{} - waitIndex := 0 - currentDocID := 0 - for i := startIndex; i < len(s.testCase.Actions); i++ { - switch action := s.testCase.Actions[i].(type) { - case CreateDoc: - if !action.NodeID.HasValue() || action.NodeID.Value() == cfg.SourceNodeID { - docIDsSyncedToSource[currentDocID] = struct{}{} - } - - // A document created on the source or one that is created on all nodes will be sent to the target even - // it already has it. It will create a `received push log` event on the target which we need to wait for. - if !action.NodeID.HasValue() || action.NodeID.Value() == cfg.SourceNodeID { - sourceToTargetEvents[waitIndex] += 1 - } - - currentDocID++ - - case DeleteDoc: - if _, shouldSyncFromTarget := docIDsSyncedToSource[action.DocID]; shouldSyncFromTarget && - action.NodeID.HasValue() && action.NodeID.Value() == cfg.TargetNodeID { - targetToSourceEvents[waitIndex] += 1 - } - - if action.NodeID.HasValue() && action.NodeID.Value() == cfg.SourceNodeID { - sourceToTargetEvents[waitIndex] += 1 - } - - case UpdateDoc: - if _, shouldSyncFromTarget := docIDsSyncedToSource[action.DocID]; shouldSyncFromTarget && - action.NodeID.HasValue() && action.NodeID.Value() == cfg.TargetNodeID { - targetToSourceEvents[waitIndex] += 1 - } - - if action.NodeID.HasValue() && action.NodeID.Value() == cfg.SourceNodeID { - sourceToTargetEvents[waitIndex] += 1 - } - - case WaitForSync: - waitIndex += 1 - targetToSourceEvents = append(targetToSourceEvents, 0) - sourceToTargetEvents = append(sourceToTargetEvents, 0) - } - } - - nodeSynced := make(chan struct{}) - go waitForMerge(s, cfg.SourceNodeID, cfg.TargetNodeID, sourceToTargetEvents, targetToSourceEvents, nodeSynced) - s.syncChans = append(s.syncChans, nodeSynced) + delete(s.nodeReplicatorTargets[cfg.TargetNodeID], cfg.SourceNodeID) + delete(s.nodeReplicatorSources[cfg.SourceNodeID], cfg.TargetNodeID) } // subscribeToCollection sets up a collection subscription on the given node/collection. @@ -385,7 +231,15 @@ func subscribeToCollection( schemaRoots = append(schemaRoots, NonExistentCollectionSchemaRoot) continue } - + if action.ExpectedError == "" { + // all previous documents should be merged on the subscriber node + if collectionIndex < len(s.documents) { + for _, doc := range s.documents[collectionIndex] { + s.expectedDocHeads[action.NodeID][doc.ID().String()] = doc.Head() + } + } + s.nodePeerCollections[collectionIndex][action.NodeID] = struct{}{} + } col := s.collections[action.NodeID][collectionIndex] schemaRoots = append(schemaRoots, col.SchemaRoot()) } @@ -415,7 +269,9 @@ func unsubscribeToCollection( schemaRoots = append(schemaRoots, NonExistentCollectionSchemaRoot) continue } - + if action.ExpectedError == "" { + delete(s.nodePeerCollections[collectionIndex], action.NodeID) + } col := s.collections[action.NodeID][collectionIndex] schemaRoots = append(schemaRoots, col.SchemaRoot()) } @@ -451,7 +307,61 @@ func getAllP2PCollections( assert.Equal(s.t, expectedCollections, cols) } -// waitForSync waits for all given wait channels to receive an item signaling completion. +// waitForUpdateEvents waits for all selected nodes to publish an update event. +// +// Expected document heads will be updated for all nodes that should receive network merges. +func waitForUpdateEvents( + s *state, + nodeID immutable.Option[int], + collectionID int, +) { + if len(s.nodeUpdateSubs) == 0 { + return // skip network testing + } + + ctx, cancel := context.WithTimeout(s.ctx, subscriptionTimeout*10) + defer cancel() + + for i := 0; i < len(s.nodes); i++ { + if nodeID.HasValue() && nodeID.Value() != i { + continue // node is not selected + } + + var evt event.Update + select { + case msg := <-s.nodeUpdateSubs[i].Message(): + evt = msg.Data.(event.Update) + case <-ctx.Done(): + s.t.Fatalf("timeout waiting for update event") + } + + // update the actual document heads + s.actualDocHeads[i][evt.DocID] = evt.Cid + + // update the expected document heads of connected nodes + for id := range s.nodeConnections[i] { + if _, ok := s.actualDocHeads[id][evt.DocID]; ok { + s.expectedDocHeads[id][evt.DocID] = evt.Cid + } + } + // update the expected document heads of replicator sources + for id := range s.nodeReplicatorTargets[i] { + if _, ok := s.actualDocHeads[id][evt.DocID]; ok { + s.expectedDocHeads[id][evt.DocID] = evt.Cid + } + } + // update the expected document heads of replicator targets + for id := range s.nodeReplicatorSources[i] { + s.expectedDocHeads[id][evt.DocID] = evt.Cid + } + // update the expected document heads of peer collection subs + for id := range s.nodePeerCollections[collectionID] { + s.expectedDocHeads[id][evt.DocID] = evt.Cid + } + } +} + +// waitForSync waits for all expected document heads to be merged to all nodes. // // Will fail the test if an event is not received within the expected time interval to prevent tests // from running forever. @@ -466,65 +376,36 @@ func waitForSync( timeout = subscriptionTimeout * 10 } - for _, resultsChan := range s.syncChans { - select { - case <-resultsChan: - assert.True( - s.t, - action.ExpectedTimeout == 0, - "unexpected document has been synced", - s.testCase.Description, - ) - - // a safety in case the stream hangs - we don't want the tests to run forever. - case <-time.After(timeout): - assert.True( - s.t, - action.ExpectedTimeout != 0, - "timeout occurred while waiting for data stream", - s.testCase.Description, - ) - } - } -} + ctx, cancel := context.WithTimeout(s.ctx, timeout) + defer cancel() -// waitForMerge waits for the source and target nodes to synchronize their state -// by listening to merge events sent from the network subsystem on the event bus. -// -// sourceToTargetEvents and targetToSourceEvents are slices containing the number -// of expected merge events to be received after each test action has executed. -func waitForMerge( - s *state, - sourceNodeID int, - targetNodeID int, - sourceToTargetEvents []int, - targetToSourceEvents []int, - nodeSynced chan struct{}, -) { - sourceSub := s.eventSubs[sourceNodeID] - targetSub := s.eventSubs[targetNodeID] - - sourcePeerInfo := s.nodeAddresses[sourceNodeID] - targetPeerInfo := s.nodeAddresses[targetNodeID] - - for waitIndex := 0; waitIndex < len(sourceToTargetEvents); waitIndex++ { - for i := 0; i < targetToSourceEvents[waitIndex]; i++ { - // wait for message or unsubscribe - msg, ok := <-sourceSub.Message() - if ok { - // ensure the message is sent from the target node - require.Equal(s.t, targetPeerInfo.ID, msg.Data.(event.Merge).ByPeer) + for nodeID, expect := range s.expectedDocHeads { + // remove any docs that are already merged + // up to the expected document head + for key, val := range s.actualDocHeads[nodeID] { + if head, ok := expect[key]; ok && head.String() == val.String() { + delete(expect, key) } } - for i := 0; i < sourceToTargetEvents[waitIndex]; i++ { - // wait for message or unsubscribe - msg, ok := <-targetSub.Message() - if ok { - // ensure the message is sent from the source node - require.Equal(s.t, sourcePeerInfo.ID, msg.Data.(event.Merge).ByPeer) + // wait for all expected doc heads to be merged + // + // the order of merges does not matter as we only + // expect the latest head to eventually be merged + // + // unexpected merge events are ignored + for len(expect) > 0 { + var evt event.Merge + select { + case msg := <-s.nodeMergeCompleteSubs[nodeID].Message(): + evt = msg.Data.(event.Merge) + case <-ctx.Done(): + s.t.Fatalf("timeout waiting for merge complete event") + } + head, ok := expect[evt.DocID] + if ok && head.String() == evt.Cid.String() { + delete(expect, evt.DocID) } } - nodeSynced <- struct{}{} } } diff --git a/tests/integration/state.go b/tests/integration/state.go index 47affa8160..0aeaa417fc 100644 --- a/tests/integration/state.go +++ b/tests/integration/state.go @@ -14,6 +14,7 @@ import ( "context" "testing" + "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcenetwork/defradb/client" @@ -50,11 +51,41 @@ type state struct { // These channels will recieve a function which asserts results of any subscription requests. subscriptionResultsChans []chan func() - // These synchronisation channels allow async actions to track their completion. - syncChans []chan struct{} + // nodeMergeCompleteSubs is a list of all merge complete event subscriptions + nodeMergeCompleteSubs []*event.Subscription - // eventSubs is a list of all event subscriptions - eventSubs []*event.Subscription + // nodeUpdateSubs is a list of all update event subscriptions + nodeUpdateSubs []*event.Subscription + + // nodeConnections contains all connected nodes. + // + // The index of the slice is the node id. The map key is the connected node id. + nodeConnections []map[int]struct{} + + // nodeReplicatorSources contains all active replicators. + // + // The index of the slice is the source node id. The map key is the target node id. + nodeReplicatorSources []map[int]struct{} + + // nodeReplicatorTargets contains all active replicators. + // + // The index of the slice is the target node id. The map key is the source node id. + nodeReplicatorTargets []map[int]struct{} + + // nodePeerCollections contains all active peer collection subscriptions. + // + // The index of the slice is the collection id. The map key is the node id of the subscriber. + nodePeerCollections []map[int]struct{} + + // actualDocHeads contains all document heads that exist on a node. + // + // The index of the slice is the node id. The map key is the doc id. The map value is the doc head. + actualDocHeads []map[string]cid.Cid + + // expectedDocHeads contains all document heads that are expected to exist on a node. + // + // The index of the slice is the node id. The map key is the doc id. The map value is the doc head. + expectedDocHeads []map[string]cid.Cid // The addresses of any nodes configured. nodeAddresses []peer.AddrInfo @@ -107,8 +138,13 @@ func newState( txns: []datastore.Txn{}, allActionsDone: make(chan struct{}), subscriptionResultsChans: []chan func(){}, - syncChans: []chan struct{}{}, - eventSubs: []*event.Subscription{}, + nodeMergeCompleteSubs: []*event.Subscription{}, + nodeConnections: []map[int]struct{}{}, + nodeReplicatorSources: []map[int]struct{}{}, + nodeReplicatorTargets: []map[int]struct{}{}, + nodePeerCollections: []map[int]struct{}{}, + actualDocHeads: []map[string]cid.Cid{}, + expectedDocHeads: []map[string]cid.Cid{}, nodeAddresses: []peer.AddrInfo{}, nodeConfigs: [][]net.NodeOpt{}, nodes: []clients.Client{}, diff --git a/tests/integration/utils2.go b/tests/integration/utils2.go index 5e9d089ccd..86d7505079 100644 --- a/tests/integration/utils2.go +++ b/tests/integration/utils2.go @@ -22,6 +22,7 @@ import ( "github.com/bxcodec/faker/support/slice" "github.com/fxamacker/cbor/v2" + "github.com/ipfs/go-cid" "github.com/sourcenetwork/corelog" "github.com/sourcenetwork/immutable" "github.com/stretchr/testify/assert" @@ -250,7 +251,7 @@ func performAction( configureNode(s, action) case Restart: - restartNodes(s, actionIndex) + restartNodes(s) case ConnectPeers: connectPeers(s, action) @@ -657,12 +658,17 @@ func setStartingNodes( s.nodes = append(s.nodes, c) s.dbPaths = append(s.dbPaths, path) + + s.nodeConnections = append(s.nodeConnections, make(map[int]struct{})) + s.nodeReplicatorSources = append(s.nodeReplicatorSources, make(map[int]struct{})) + s.nodeReplicatorTargets = append(s.nodeReplicatorTargets, make(map[int]struct{})) + s.expectedDocHeads = append(s.expectedDocHeads, make(map[string]cid.Cid)) + s.actualDocHeads = append(s.actualDocHeads, make(map[string]cid.Cid)) } } func restartNodes( s *state, - actionIndex int, ) { if s.dbt == badgerIMType || s.dbt == defraIMType { return @@ -710,36 +716,14 @@ func restartNodes( s.nodes[i] = c // subscribe to merge complete events - sub, err := c.Events().Subscribe(event.MergeCompleteName) + mergeCompleteSub, err := c.Events().Subscribe(event.MergeCompleteName) require.NoError(s.t, err) - s.eventSubs[i] = sub - } - - // The index of the action after the last wait action before the current restart action. - // We wish to resume the wait clock from this point onwards. - waitGroupStartIndex := 0 -actionLoop: - for i := actionIndex; i >= 0; i-- { - switch s.testCase.Actions[i].(type) { - case WaitForSync: - // +1 as we do not wish to resume from the wait itself, but the next action - // following it. This may be the current restart action. - waitGroupStartIndex = i + 1 - break actionLoop - } - } + s.nodeMergeCompleteSubs = append(s.nodeMergeCompleteSubs, mergeCompleteSub) - for _, tc := range s.testCase.Actions { - switch action := tc.(type) { - case ConnectPeers: - // Give the nodes a chance to connect to each other and learn about each other's subscribed topics. - time.Sleep(100 * time.Millisecond) - setupPeerWaitSync(s, waitGroupStartIndex, action) - case ConfigureReplicator: - // Give the nodes a chance to connect to each other and learn about each other's subscribed topics. - time.Sleep(100 * time.Millisecond) - setupReplicatorWaitSync(s, waitGroupStartIndex, action) - } + // subscribe to update events + updateSub, err := c.Events().Subscribe(event.UpdateName) + require.NoError(s.t, err) + s.nodeUpdateSubs = append(s.nodeUpdateSubs, updateSub) } // If the db was restarted we need to refresh the collection definitions as the old instances @@ -757,6 +741,10 @@ func refreshCollections( ) { s.collections = make([][]client.Collection, len(s.nodes)) + for i := len(s.nodePeerCollections); i < len(s.collectionNames); i++ { + s.nodePeerCollections = append(s.nodePeerCollections, make(map[int]struct{})) + } + for nodeID, node := range s.nodes { s.collections[nodeID] = make([]client.Collection, len(s.collectionNames)) allCollections, err := node.GetCollections(s.ctx, client.CollectionFetchOptions{}) @@ -815,10 +803,21 @@ func configureNode( s.nodes = append(s.nodes, c) s.dbPaths = append(s.dbPaths, path) + s.nodeConnections = append(s.nodeConnections, make(map[int]struct{})) + s.nodeReplicatorSources = append(s.nodeReplicatorSources, make(map[int]struct{})) + s.nodeReplicatorTargets = append(s.nodeReplicatorTargets, make(map[int]struct{})) + s.expectedDocHeads = append(s.expectedDocHeads, make(map[string]cid.Cid)) + s.actualDocHeads = append(s.actualDocHeads, make(map[string]cid.Cid)) + // subscribe to merge complete events - sub, err := c.Events().Subscribe(event.MergeCompleteName) + mergeCompleteSub, err := c.Events().Subscribe(event.MergeCompleteName) + require.NoError(s.t, err) + s.nodeMergeCompleteSubs = append(s.nodeMergeCompleteSubs, mergeCompleteSub) + + // subscribe to update events + updateSub, err := c.Events().Subscribe(event.UpdateName) require.NoError(s.t, err) - s.eventSubs = append(s.eventSubs, sub) + s.nodeUpdateSubs = append(s.nodeUpdateSubs, updateSub) } func refreshDocuments( @@ -1199,6 +1198,10 @@ func createDoc( s.documents = append(s.documents, make([][]*client.Document, action.CollectionID-len(s.documents)+1)...) } s.documents[action.CollectionID] = append(s.documents[action.CollectionID], doc) + + if action.ExpectedError == "" { + waitForUpdateEvents(s, action.NodeID, action.CollectionID) + } } func createDocViaColSave( @@ -1349,6 +1352,10 @@ func deleteDoc( } assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) + + if action.ExpectedError == "" { + waitForUpdateEvents(s, action.NodeID, action.CollectionID) + } } // updateDoc updates a document using the chosen [mutationType]. @@ -1381,6 +1388,10 @@ func updateDoc( } assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) + + if action.ExpectedError == "" { + waitForUpdateEvents(s, action.NodeID, action.CollectionID) + } } func updateDocViaColSave( From d832d324f5187aff14a1593415d4a932577f2fa5 Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Fri, 21 Jun 2024 12:21:27 -0700 Subject: [PATCH 02/14] fix node restart subscription --- tests/integration/utils2.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/utils2.go b/tests/integration/utils2.go index 86d7505079..e73278d902 100644 --- a/tests/integration/utils2.go +++ b/tests/integration/utils2.go @@ -718,12 +718,12 @@ func restartNodes( // subscribe to merge complete events mergeCompleteSub, err := c.Events().Subscribe(event.MergeCompleteName) require.NoError(s.t, err) - s.nodeMergeCompleteSubs = append(s.nodeMergeCompleteSubs, mergeCompleteSub) + s.nodeMergeCompleteSubs[i] = mergeCompleteSub // subscribe to update events updateSub, err := c.Events().Subscribe(event.UpdateName) require.NoError(s.t, err) - s.nodeUpdateSubs = append(s.nodeUpdateSubs, updateSub) + s.nodeUpdateSubs[i] = updateSub } // If the db was restarted we need to refresh the collection definitions as the old instances From b2995683069aa0e15f09492d4c48722ea648e30e Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Fri, 21 Jun 2024 12:55:37 -0700 Subject: [PATCH 03/14] fix peer collection expected docs --- tests/integration/p2p.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go index 0a6c6901e0..a84c0dddc0 100644 --- a/tests/integration/p2p.go +++ b/tests/integration/p2p.go @@ -235,7 +235,9 @@ func subscribeToCollection( // all previous documents should be merged on the subscriber node if collectionIndex < len(s.documents) { for _, doc := range s.documents[collectionIndex] { - s.expectedDocHeads[action.NodeID][doc.ID().String()] = doc.Head() + for nodeID := range s.nodeConnections[action.NodeID] { + s.expectedDocHeads[action.NodeID][doc.ID().String()] = s.actualDocHeads[nodeID][doc.ID().String()] + } } } s.nodePeerCollections[collectionIndex][action.NodeID] = struct{}{} From 8fb16cf3509623742357bf35fb22092dbc6c5140 Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Tue, 16 Jul 2024 10:41:55 -0700 Subject: [PATCH 04/14] add test event state --- tests/integration/events.go | 121 ++++++++++++++++++++++++++++++++++++ tests/integration/p2p.go | 104 ------------------------------- tests/integration/state.go | 50 +++++++++++++-- tests/integration/utils2.go | 37 +++++------ 4 files changed, 182 insertions(+), 130 deletions(-) create mode 100644 tests/integration/events.go diff --git a/tests/integration/events.go b/tests/integration/events.go new file mode 100644 index 0000000000..583bc08e8b --- /dev/null +++ b/tests/integration/events.go @@ -0,0 +1,121 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "context" + "time" + + "github.com/sourcenetwork/defradb/event" + "github.com/sourcenetwork/immutable" + "github.com/stretchr/testify/require" +) + +// waitForUpdateEvents waits for all selected nodes to publish an update event to the local event bus. +// +// Expected document heads will be updated for all nodes that should receive network merges. +func waitForUpdateEvents( + s *state, + nodeID immutable.Option[int], + collectionID int, +) { + ctx, cancel := context.WithTimeout(s.ctx, subscriptionTimeout*10) + defer cancel() + + for i := 0; i < len(s.nodes); i++ { + if nodeID.HasValue() && nodeID.Value() != i { + continue // node is not selected + } + + var evt event.Update + select { + case msg := <-s.nodeEvents[i].update.Message(): + evt = msg.Data.(event.Update) + + case <-ctx.Done(): + require.Fail(s.t, "timeout waiting for update event") + } + + // update the actual document heads + s.actualDocHeads[i][evt.DocID] = evt.Cid + + // update the expected document heads of connected nodes + for id := range s.nodeConnections[i] { + if _, ok := s.actualDocHeads[id][evt.DocID]; ok { + s.expectedDocHeads[id][evt.DocID] = evt.Cid + } + } + // update the expected document heads of replicator sources + for id := range s.nodeReplicatorTargets[i] { + if _, ok := s.actualDocHeads[id][evt.DocID]; ok { + s.expectedDocHeads[id][evt.DocID] = evt.Cid + } + } + // update the expected document heads of replicator targets + for id := range s.nodeReplicatorSources[i] { + s.expectedDocHeads[id][evt.DocID] = evt.Cid + } + // update the expected document heads of peer collection subs + for id := range s.nodePeerCollections[collectionID] { + s.expectedDocHeads[id][evt.DocID] = evt.Cid + } + } +} + +// waitForMergeEvents waits for all expected document heads to be merged to all nodes. +// +// Will fail the test if an event is not received within the expected time interval to prevent tests +// from running forever. +func waitForMergeEvents( + s *state, + action WaitForSync, +) { + var timeout time.Duration + if action.ExpectedTimeout != 0 { + timeout = action.ExpectedTimeout + } else { + timeout = subscriptionTimeout * 10 + } + + ctx, cancel := context.WithTimeout(s.ctx, timeout) + defer cancel() + + for nodeID, expect := range s.expectedDocHeads { + // remove any docs that are already merged + // up to the expected document head + for key, val := range s.actualDocHeads[nodeID] { + if head, ok := expect[key]; ok && head.String() == val.String() { + delete(expect, key) + } + } + // wait for all expected doc heads to be merged + // + // the order of merges does not matter as we only + // expect the latest head to eventually be merged + // + // unexpected merge events are ignored + for len(expect) > 0 { + var evt event.Merge + select { + case msg := <-s.nodeEvents[nodeID].merge.Message(): + evt = msg.Data.(event.Merge) + + case <-ctx.Done(): + require.Fail(s.t, "timeout waiting for merge complete event") + } + + head, ok := expect[evt.DocID] + if ok && head.String() == evt.Cid.String() { + delete(expect, evt.DocID) + } + } + } +} diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go index 672a2009eb..c957b25f90 100644 --- a/tests/integration/p2p.go +++ b/tests/integration/p2p.go @@ -11,7 +11,6 @@ package tests import ( - "context" "time" "github.com/sourcenetwork/defradb/client" @@ -20,7 +19,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcenetwork/corelog" - "github.com/sourcenetwork/immutable" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -337,108 +335,6 @@ func getAllP2PCollections( assert.Equal(s.t, expectedCollections, cols) } -// waitForUpdateEvents waits for all selected nodes to publish an update event. -// -// Expected document heads will be updated for all nodes that should receive network merges. -func waitForUpdateEvents( - s *state, - nodeID immutable.Option[int], - collectionID int, -) { - if len(s.nodeUpdateSubs) == 0 { - return // skip network testing - } - - ctx, cancel := context.WithTimeout(s.ctx, subscriptionTimeout*10) - defer cancel() - - for i := 0; i < len(s.nodes); i++ { - if nodeID.HasValue() && nodeID.Value() != i { - continue // node is not selected - } - - var evt event.Update - select { - case msg := <-s.nodeUpdateSubs[i].Message(): - evt = msg.Data.(event.Update) - case <-ctx.Done(): - s.t.Fatalf("timeout waiting for update event") - } - - // update the actual document heads - s.actualDocHeads[i][evt.DocID] = evt.Cid - - // update the expected document heads of connected nodes - for id := range s.nodeConnections[i] { - if _, ok := s.actualDocHeads[id][evt.DocID]; ok { - s.expectedDocHeads[id][evt.DocID] = evt.Cid - } - } - // update the expected document heads of replicator sources - for id := range s.nodeReplicatorTargets[i] { - if _, ok := s.actualDocHeads[id][evt.DocID]; ok { - s.expectedDocHeads[id][evt.DocID] = evt.Cid - } - } - // update the expected document heads of replicator targets - for id := range s.nodeReplicatorSources[i] { - s.expectedDocHeads[id][evt.DocID] = evt.Cid - } - // update the expected document heads of peer collection subs - for id := range s.nodePeerCollections[collectionID] { - s.expectedDocHeads[id][evt.DocID] = evt.Cid - } - } -} - -// waitForSync waits for all expected document heads to be merged to all nodes. -// -// Will fail the test if an event is not received within the expected time interval to prevent tests -// from running forever. -func waitForSync( - s *state, - action WaitForSync, -) { - var timeout time.Duration - if action.ExpectedTimeout != 0 { - timeout = action.ExpectedTimeout - } else { - timeout = subscriptionTimeout * 10 - } - - ctx, cancel := context.WithTimeout(s.ctx, timeout) - defer cancel() - - for nodeID, expect := range s.expectedDocHeads { - // remove any docs that are already merged - // up to the expected document head - for key, val := range s.actualDocHeads[nodeID] { - if head, ok := expect[key]; ok && head.String() == val.String() { - delete(expect, key) - } - } - // wait for all expected doc heads to be merged - // - // the order of merges does not matter as we only - // expect the latest head to eventually be merged - // - // unexpected merge events are ignored - for len(expect) > 0 { - var evt event.Merge - select { - case msg := <-s.nodeMergeCompleteSubs[nodeID].Message(): - evt = msg.Data.(event.Merge) - case <-ctx.Done(): - s.t.Fatalf("timeout waiting for merge complete event") - } - head, ok := expect[evt.DocID] - if ok && head.String() == evt.Cid.String() { - delete(expect, evt.DocID) - } - } - } -} - func RandomNetworkingConfig() ConfigureNode { return func() []net.NodeOpt { return []net.NodeOpt{ diff --git a/tests/integration/state.go b/tests/integration/state.go index c8d848e588..ee5ea35f12 100644 --- a/tests/integration/state.go +++ b/tests/integration/state.go @@ -25,6 +25,47 @@ import ( "github.com/sourcenetwork/defradb/tests/clients" ) +// eventState contains all event related testing state for a node. +type eventState struct { + // merge is the `event.MergeCompleteName` subscription + merge *event.Subscription + + // update is the `event.UpdateName` subscription + update *event.Subscription + + // replicator is the `event.ReplicatorCompletedName` subscription + replicator *event.Subscription + + // p2pTopic is the `event.P2PTopicCompletedName` subscription + p2pTopic *event.Subscription +} + +// newEventState returns an eventState with all required subscriptions. +func newEventState(bus *event.Bus) (*eventState, error) { + merge, err := bus.Subscribe(event.MergeCompleteName) + if err != nil { + return nil, err + } + update, err := bus.Subscribe(event.UpdateName) + if err != nil { + return nil, err + } + replicator, err := bus.Subscribe(event.ReplicatorCompletedName) + if err != nil { + return nil, err + } + p2pTopic, err := bus.Subscribe(event.P2PTopicCompletedName) + if err != nil { + return nil, err + } + return &eventState{ + merge: merge, + update: update, + replicator: replicator, + p2pTopic: p2pTopic, + }, nil +} + type state struct { // The test context. ctx context.Context @@ -54,11 +95,8 @@ type state struct { // These channels will recieve a function which asserts results of any subscription requests. subscriptionResultsChans []chan func() - // nodeMergeCompleteSubs is a list of all merge complete event subscriptions - nodeMergeCompleteSubs []*event.Subscription - - // nodeUpdateSubs is a list of all update event subscriptions - nodeUpdateSubs []*event.Subscription + // nodeEvents contains all event node subscriptions. + nodeEvents []*eventState // nodeConnections contains all connected nodes. // @@ -141,11 +179,11 @@ func newState( txns: []datastore.Txn{}, allActionsDone: make(chan struct{}), subscriptionResultsChans: []chan func(){}, - nodeMergeCompleteSubs: []*event.Subscription{}, nodeConnections: []map[int]struct{}{}, nodeReplicatorSources: []map[int]struct{}{}, nodeReplicatorTargets: []map[int]struct{}{}, nodePeerCollections: []map[int]struct{}{}, + nodeEvents: []*eventState{}, actualDocHeads: []map[string]cid.Cid{}, expectedDocHeads: []map[string]cid.Cid{}, nodeAddresses: []peer.AddrInfo{}, diff --git a/tests/integration/utils2.go b/tests/integration/utils2.go index f132037cd8..f08c629766 100644 --- a/tests/integration/utils2.go +++ b/tests/integration/utils2.go @@ -343,7 +343,7 @@ func performAction( assertClientIntrospectionResults(s, action) case WaitForSync: - waitForSync(s, action) + waitForMergeEvents(s, action) case Benchmark: benchmarkAction(s, actionIndex, action) @@ -658,7 +658,11 @@ func setStartingNodes( c, err := setupClient(s, node) require.Nil(s.t, err) + eventState, err := newEventState(c.Events()) + require.NoError(s.t, err) + s.nodes = append(s.nodes, c) + s.nodeEvents = append(s.nodeEvents, eventState) s.dbPaths = append(s.dbPaths, path) s.nodeConnections = append(s.nodeConnections, make(map[int]struct{})) @@ -691,6 +695,10 @@ func restartNodes( c, err := setupClient(s, node) require.NoError(s.t, err) s.nodes[i] = c + + eventState, err := newEventState(c.Events()) + require.NoError(s.t, err) + s.nodeEvents[i] = eventState continue } @@ -733,15 +741,9 @@ func restartNodes( require.NoError(s.t, err) s.nodes[i] = c - // subscribe to merge complete events - mergeCompleteSub, err := c.Events().Subscribe(event.MergeCompleteName) + eventState, err := newEventState(c.Events()) require.NoError(s.t, err) - s.nodeMergeCompleteSubs[i] = mergeCompleteSub - - // subscribe to update events - updateSub, err := c.Events().Subscribe(event.UpdateName) - require.NoError(s.t, err) - s.nodeUpdateSubs[i] = updateSub + s.nodeEvents = append(s.nodeEvents, eventState) for waitLen > 0 { select { @@ -828,7 +830,11 @@ func configureNode( c, err := setupClient(s, node) require.NoError(s.t, err) + eventState, err := newEventState(c.Events()) + require.NoError(s.t, err) + s.nodes = append(s.nodes, c) + s.nodeEvents = append(s.nodeEvents, eventState) s.dbPaths = append(s.dbPaths, path) s.nodeConnections = append(s.nodeConnections, make(map[int]struct{})) @@ -836,16 +842,6 @@ func configureNode( s.nodeReplicatorTargets = append(s.nodeReplicatorTargets, make(map[int]struct{})) s.expectedDocHeads = append(s.expectedDocHeads, make(map[string]cid.Cid)) s.actualDocHeads = append(s.actualDocHeads, make(map[string]cid.Cid)) - - // subscribe to merge complete events - mergeCompleteSub, err := c.Events().Subscribe(event.MergeCompleteName) - require.NoError(s.t, err) - s.nodeMergeCompleteSubs = append(s.nodeMergeCompleteSubs, mergeCompleteSub) - - // subscribe to update events - updateSub, err := c.Events().Subscribe(event.UpdateName) - require.NoError(s.t, err) - s.nodeUpdateSubs = append(s.nodeUpdateSubs, updateSub) } func refreshDocuments( @@ -1222,10 +1218,11 @@ func createDoc( // Expand the slice if required, so that the document can be accessed by collection index s.documents = append(s.documents, make([][]*client.Document, action.CollectionID-len(s.documents)+1)...) } + s.documents[action.CollectionID] = append(s.documents[action.CollectionID], docs...) + if action.ExpectedError == "" { waitForUpdateEvents(s, action.NodeID, action.CollectionID) } - s.documents[action.CollectionID] = append(s.documents[action.CollectionID], docs...) } func createDocViaColSave( From 6e6e3f5a6ba798a28ac91006af59122637a9dcd2 Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Tue, 16 Jul 2024 11:36:44 -0700 Subject: [PATCH 05/14] move all event test logic to events file --- tests/integration/events.go | 149 +++++++++++++++++++++++++++++++----- tests/integration/p2p.go | 59 +++----------- tests/integration/utils2.go | 26 +------ 3 files changed, 140 insertions(+), 94 deletions(-) diff --git a/tests/integration/events.go b/tests/integration/events.go index 583bc08e8b..e357959c36 100644 --- a/tests/integration/events.go +++ b/tests/integration/events.go @@ -11,7 +11,6 @@ package tests import ( - "context" "time" "github.com/sourcenetwork/defradb/event" @@ -19,17 +18,133 @@ import ( "github.com/stretchr/testify/require" ) -// waitForUpdateEvents waits for all selected nodes to publish an update event to the local event bus. +// eventTimeout is the default amount of time +// to wait for an event before timing out +const eventTimeout = 1 * time.Second + +// waitForNetworkSetupEvents waits for p2p topic completed and +// replicator completed events to be published on the local node event bus. +func waitForNetworkSetupEvents(s *state, nodeID int) { + cols, err := s.nodes[nodeID].GetAllP2PCollections(s.ctx) + require.NoError(s.t, err) + + reps, err := s.nodes[nodeID].GetAllReplicators(s.ctx) + require.NoError(s.t, err) + + p2pTopicEvents := 0 + replicatorEvents := len(reps) + + // there is only one message for loading of P2P collections + if len(cols) > 0 { + p2pTopicEvents = 1 + } + + for p2pTopicEvents > 0 && replicatorEvents > 0 { + select { + case <-s.nodeEvents[nodeID].replicator.Message(): + replicatorEvents-- + + case <-s.nodeEvents[nodeID].p2pTopic.Message(): + p2pTopicEvents-- + + case <-time.After(eventTimeout): + s.t.Fatalf("timeout waiting for node to be ready") + } + } +} + +// waitForReplicatorConfigureEvent waits for a node to publish a +// replicator completed event on the local event bus. // -// Expected document heads will be updated for all nodes that should receive network merges. -func waitForUpdateEvents( - s *state, - nodeID immutable.Option[int], - collectionID int, -) { - ctx, cancel := context.WithTimeout(s.ctx, subscriptionTimeout*10) - defer cancel() +// Expected document heads will be updated for the targeted node. +func waitForReplicatorConfigureEvent(s *state, cfg ConfigureReplicator) { + select { + case <-s.nodeEvents[cfg.SourceNodeID].replicator.Message(): + // event recieved + + case <-time.After(eventTimeout): + require.Fail(s.t, "timeout waiting for replicator event") + } + // all previous documents should be merged on the subscriber node + for key, val := range s.actualDocHeads[cfg.SourceNodeID] { + s.expectedDocHeads[cfg.TargetNodeID][key] = val + } + s.nodeReplicatorTargets[cfg.TargetNodeID][cfg.SourceNodeID] = struct{}{} + s.nodeReplicatorSources[cfg.SourceNodeID][cfg.TargetNodeID] = struct{}{} +} + +// waitForReplicatorConfigureEvent waits for a node to publish a +// replicator completed event on the local event bus. +func waitForReplicatorDeleteEvent(s *state, cfg DeleteReplicator) { + select { + case <-s.nodeEvents[cfg.SourceNodeID].replicator.Message(): + // event recieved + + case <-time.After(eventTimeout): + require.Fail(s.t, "timeout waiting for replicator event") + } + + delete(s.nodeReplicatorTargets[cfg.TargetNodeID], cfg.SourceNodeID) + delete(s.nodeReplicatorSources[cfg.SourceNodeID], cfg.TargetNodeID) +} + +// waitForSubscribeToCollectionEvent waits for a node to publish a +// p2p topic completed event on the local event bus. +// +// Expected document heads will be updated for the subscriber node. +func waitForSubscribeToCollectionEvent(s *state, action SubscribeToCollection) { + select { + case <-s.nodeEvents[action.NodeID].p2pTopic.Message(): + // event recieved + + case <-time.After(eventTimeout): + require.Fail(s.t, "timeout waiting for p2p topic event") + } + + // update peer collections and expected documents of subscribed node + for _, collectionIndex := range action.CollectionIDs { + if collectionIndex == NonExistentCollectionID { + continue // don't track non existent collections + } + s.nodePeerCollections[collectionIndex][action.NodeID] = struct{}{} + if collectionIndex >= len(s.documents) { + continue // no documents to track + } + // all previous documents should be merged on the subscriber node + for _, doc := range s.documents[collectionIndex] { + for nodeID := range s.nodeConnections[action.NodeID] { + head := s.actualDocHeads[nodeID][doc.ID().String()] + s.expectedDocHeads[action.NodeID][doc.ID().String()] = head + } + } + } +} + +// waitForSubscribeToCollectionEvent waits for a node to publish a +// p2p topic completed event on the local event bus. +func waitForUnsubscribeToCollectionEvent(s *state, action UnsubscribeToCollection) { + select { + case <-s.nodeEvents[action.NodeID].p2pTopic.Message(): + // event recieved + + case <-time.After(eventTimeout): + require.Fail(s.t, "timeout waiting for p2p topic event") + } + + for _, collectionIndex := range action.CollectionIDs { + if collectionIndex == NonExistentCollectionID { + continue // don't track non existent collections + } + delete(s.nodePeerCollections[collectionIndex], action.NodeID) + } +} + +// waitForUpdateEvents waits for all selected nodes to publish an +// update event to the local event bus. +// +// Expected document heads will be updated for any connected nodes. +func waitForUpdateEvents(s *state, nodeID immutable.Option[int], collectionID int) { for i := 0; i < len(s.nodes); i++ { if nodeID.HasValue() && nodeID.Value() != i { continue // node is not selected @@ -40,7 +155,7 @@ func waitForUpdateEvents( case msg := <-s.nodeEvents[i].update.Message(): evt = msg.Data.(event.Update) - case <-ctx.Done(): + case <-time.After(eventTimeout): require.Fail(s.t, "timeout waiting for update event") } @@ -74,20 +189,14 @@ func waitForUpdateEvents( // // Will fail the test if an event is not received within the expected time interval to prevent tests // from running forever. -func waitForMergeEvents( - s *state, - action WaitForSync, -) { +func waitForMergeEvents(s *state, action WaitForSync) { var timeout time.Duration if action.ExpectedTimeout != 0 { timeout = action.ExpectedTimeout } else { - timeout = subscriptionTimeout * 10 + timeout = eventTimeout } - ctx, cancel := context.WithTimeout(s.ctx, timeout) - defer cancel() - for nodeID, expect := range s.expectedDocHeads { // remove any docs that are already merged // up to the expected document head @@ -108,7 +217,7 @@ func waitForMergeEvents( case msg := <-s.nodeEvents[nodeID].merge.Message(): evt = msg.Data.(event.Merge) - case <-ctx.Done(): + case <-time.After(timeout): require.Fail(s.t, "timeout waiting for merge complete event") } diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go index c957b25f90..f1d20d2b1f 100644 --- a/tests/integration/p2p.go +++ b/tests/integration/p2p.go @@ -14,7 +14,6 @@ import ( "time" "github.com/sourcenetwork/defradb/client" - "github.com/sourcenetwork/defradb/event" "github.com/sourcenetwork/defradb/net" "github.com/libp2p/go-libp2p/core/peer" @@ -182,26 +181,15 @@ func configureReplicator( sourceNode := s.nodes[cfg.SourceNodeID] targetNode := s.nodes[cfg.TargetNodeID] - sub, err := sourceNode.Events().Subscribe(event.ReplicatorCompletedName) - require.NoError(s.t, err) - err = sourceNode.SetReplicator(s.ctx, client.Replicator{ + err := sourceNode.SetReplicator(s.ctx, client.Replicator{ Info: targetNode.PeerInfo(), }) - if err == nil { - // wait for the replicator setup to complete - <-sub.Message() - } expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, cfg.ExpectedError) assertExpectedErrorRaised(s.t, s.testCase.Description, cfg.ExpectedError, expectedErrorRaised) if err == nil { - // all previous documents should be merged on the subscriber node - for key, val := range s.actualDocHeads[cfg.SourceNodeID] { - s.expectedDocHeads[cfg.TargetNodeID][key] = val - } - s.nodeReplicatorTargets[cfg.TargetNodeID][cfg.SourceNodeID] = struct{}{} - s.nodeReplicatorSources[cfg.SourceNodeID][cfg.TargetNodeID] = struct{}{} + waitForReplicatorConfigureEvent(s, cfg) } } @@ -212,18 +200,11 @@ func deleteReplicator( sourceNode := s.nodes[cfg.SourceNodeID] targetNode := s.nodes[cfg.TargetNodeID] - sub, err := sourceNode.Events().Subscribe(event.ReplicatorCompletedName) - require.NoError(s.t, err) - err = sourceNode.DeleteReplicator(s.ctx, client.Replicator{ + err := sourceNode.DeleteReplicator(s.ctx, client.Replicator{ Info: targetNode.PeerInfo(), }) - if err == nil { - // wait for the replicator setup to complete - <-sub.Message() - } require.NoError(s.t, err) - delete(s.nodeReplicatorTargets[cfg.TargetNodeID], cfg.SourceNodeID) - delete(s.nodeReplicatorSources[cfg.SourceNodeID], cfg.TargetNodeID) + waitForReplicatorDeleteEvent(s, cfg) } // subscribeToCollection sets up a collection subscription on the given node/collection. @@ -241,28 +222,14 @@ func subscribeToCollection( schemaRoots = append(schemaRoots, NonExistentCollectionSchemaRoot) continue } - if action.ExpectedError == "" { - // all previous documents should be merged on the subscriber node - if collectionIndex < len(s.documents) { - for _, doc := range s.documents[collectionIndex] { - for nodeID := range s.nodeConnections[action.NodeID] { - s.expectedDocHeads[action.NodeID][doc.ID().String()] = s.actualDocHeads[nodeID][doc.ID().String()] - } - } - } - s.nodePeerCollections[collectionIndex][action.NodeID] = struct{}{} - } + col := s.collections[action.NodeID][collectionIndex] schemaRoots = append(schemaRoots, col.SchemaRoot()) } - sub, err := n.Events().Subscribe(event.P2PTopicCompletedName) - require.NoError(s.t, err) - - err = n.AddP2PCollections(s.ctx, schemaRoots) + err := n.AddP2PCollections(s.ctx, schemaRoots) if err == nil { - // wait for the p2p collection setup to complete - <-sub.Message() + waitForSubscribeToCollectionEvent(s, action) } expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError) @@ -289,20 +256,14 @@ func unsubscribeToCollection( schemaRoots = append(schemaRoots, NonExistentCollectionSchemaRoot) continue } - if action.ExpectedError == "" { - delete(s.nodePeerCollections[collectionIndex], action.NodeID) - } + col := s.collections[action.NodeID][collectionIndex] schemaRoots = append(schemaRoots, col.SchemaRoot()) } - sub, err := n.Events().Subscribe(event.P2PTopicCompletedName) - require.NoError(s.t, err) - - err = n.RemoveP2PCollections(s.ctx, schemaRoots) + err := n.RemoveP2PCollections(s.ctx, schemaRoots) if err == nil { - // wait for the p2p collection setup to complete - <-sub.Message() + waitForUnsubscribeToCollectionEvent(s, action) } expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError) diff --git a/tests/integration/utils2.go b/tests/integration/utils2.go index f08c629766..78da73eea9 100644 --- a/tests/integration/utils2.go +++ b/tests/integration/utils2.go @@ -34,7 +34,6 @@ import ( "github.com/sourcenetwork/defradb/crypto" "github.com/sourcenetwork/defradb/datastore" "github.com/sourcenetwork/defradb/errors" - "github.com/sourcenetwork/defradb/event" "github.com/sourcenetwork/defradb/internal/db" "github.com/sourcenetwork/defradb/internal/encryption" "github.com/sourcenetwork/defradb/internal/request/graphql" @@ -702,22 +701,6 @@ func restartNodes( continue } - // We need to ensure that on restart, the node pubsub is configured before - // we continue with the test. Otherwise, we may miss update events. - readySub, err := node.DB.Events().Subscribe(event.P2PTopicCompletedName, event.ReplicatorCompletedName) - require.NoError(s.t, err) - waitLen := 0 - cols, err := node.DB.GetAllP2PCollections(s.ctx) - require.NoError(s.t, err) - if len(cols) > 0 { - // there is only one message for loading of P2P collections - waitLen++ - } - reps, err := node.DB.GetAllReplicators(s.ctx) - require.NoError(s.t, err) - // there is one message per replicator - waitLen += len(reps) - // We need to make sure the node is configured with its old address, otherwise // a new one may be selected and reconnnection to it will fail. var addresses []string @@ -745,14 +728,7 @@ func restartNodes( require.NoError(s.t, err) s.nodeEvents = append(s.nodeEvents, eventState) - for waitLen > 0 { - select { - case <-readySub.Message(): - waitLen-- - case <-time.After(10 * time.Second): - s.t.Fatalf("timeout waiting for node to be ready") - } - } + waitForNetworkSetupEvents(s, i) } // If the db was restarted we need to refresh the collection definitions as the old instances From f3540b0250704618e5210b9a3b8a5b1dce42db7c Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Tue, 16 Jul 2024 11:45:55 -0700 Subject: [PATCH 06/14] move p2p state to struct --- tests/integration/events.go | 53 ++++++++++++++------------- tests/integration/p2p.go | 4 +- tests/integration/state.go | 73 +++++++++++++++++++------------------ tests/integration/utils2.go | 24 ++++++------ 4 files changed, 78 insertions(+), 76 deletions(-) diff --git a/tests/integration/events.go b/tests/integration/events.go index e357959c36..add4386eba 100644 --- a/tests/integration/events.go +++ b/tests/integration/events.go @@ -13,9 +13,10 @@ package tests import ( "time" - "github.com/sourcenetwork/defradb/event" "github.com/sourcenetwork/immutable" "github.com/stretchr/testify/require" + + "github.com/sourcenetwork/defradb/event" ) // eventTimeout is the default amount of time @@ -48,7 +49,7 @@ func waitForNetworkSetupEvents(s *state, nodeID int) { p2pTopicEvents-- case <-time.After(eventTimeout): - s.t.Fatalf("timeout waiting for node to be ready") + s.t.Fatalf("timeout waiting for network setup events") } } } @@ -67,11 +68,11 @@ func waitForReplicatorConfigureEvent(s *state, cfg ConfigureReplicator) { } // all previous documents should be merged on the subscriber node - for key, val := range s.actualDocHeads[cfg.SourceNodeID] { - s.expectedDocHeads[cfg.TargetNodeID][key] = val + for key, val := range s.p2p.actualDocHeads[cfg.SourceNodeID] { + s.p2p.expectedDocHeads[cfg.TargetNodeID][key] = val } - s.nodeReplicatorTargets[cfg.TargetNodeID][cfg.SourceNodeID] = struct{}{} - s.nodeReplicatorSources[cfg.SourceNodeID][cfg.TargetNodeID] = struct{}{} + s.p2p.replicatorTargets[cfg.TargetNodeID][cfg.SourceNodeID] = struct{}{} + s.p2p.replicatorSources[cfg.SourceNodeID][cfg.TargetNodeID] = struct{}{} } // waitForReplicatorConfigureEvent waits for a node to publish a @@ -85,8 +86,8 @@ func waitForReplicatorDeleteEvent(s *state, cfg DeleteReplicator) { require.Fail(s.t, "timeout waiting for replicator event") } - delete(s.nodeReplicatorTargets[cfg.TargetNodeID], cfg.SourceNodeID) - delete(s.nodeReplicatorSources[cfg.SourceNodeID], cfg.TargetNodeID) + delete(s.p2p.replicatorTargets[cfg.TargetNodeID], cfg.SourceNodeID) + delete(s.p2p.replicatorSources[cfg.SourceNodeID], cfg.TargetNodeID) } // waitForSubscribeToCollectionEvent waits for a node to publish a @@ -107,15 +108,15 @@ func waitForSubscribeToCollectionEvent(s *state, action SubscribeToCollection) { if collectionIndex == NonExistentCollectionID { continue // don't track non existent collections } - s.nodePeerCollections[collectionIndex][action.NodeID] = struct{}{} + s.p2p.peerCollections[collectionIndex][action.NodeID] = struct{}{} if collectionIndex >= len(s.documents) { continue // no documents to track } // all previous documents should be merged on the subscriber node for _, doc := range s.documents[collectionIndex] { - for nodeID := range s.nodeConnections[action.NodeID] { - head := s.actualDocHeads[nodeID][doc.ID().String()] - s.expectedDocHeads[action.NodeID][doc.ID().String()] = head + for nodeID := range s.p2p.connections[action.NodeID] { + head := s.p2p.actualDocHeads[nodeID][doc.ID().String()] + s.p2p.expectedDocHeads[action.NodeID][doc.ID().String()] = head } } } @@ -136,7 +137,7 @@ func waitForUnsubscribeToCollectionEvent(s *state, action UnsubscribeToCollectio if collectionIndex == NonExistentCollectionID { continue // don't track non existent collections } - delete(s.nodePeerCollections[collectionIndex], action.NodeID) + delete(s.p2p.peerCollections[collectionIndex], action.NodeID) } } @@ -160,27 +161,27 @@ func waitForUpdateEvents(s *state, nodeID immutable.Option[int], collectionID in } // update the actual document heads - s.actualDocHeads[i][evt.DocID] = evt.Cid + s.p2p.actualDocHeads[i][evt.DocID] = evt.Cid // update the expected document heads of connected nodes - for id := range s.nodeConnections[i] { - if _, ok := s.actualDocHeads[id][evt.DocID]; ok { - s.expectedDocHeads[id][evt.DocID] = evt.Cid + for id := range s.p2p.connections[i] { + if _, ok := s.p2p.actualDocHeads[id][evt.DocID]; ok { + s.p2p.expectedDocHeads[id][evt.DocID] = evt.Cid } } // update the expected document heads of replicator sources - for id := range s.nodeReplicatorTargets[i] { - if _, ok := s.actualDocHeads[id][evt.DocID]; ok { - s.expectedDocHeads[id][evt.DocID] = evt.Cid + for id := range s.p2p.replicatorTargets[i] { + if _, ok := s.p2p.actualDocHeads[id][evt.DocID]; ok { + s.p2p.expectedDocHeads[id][evt.DocID] = evt.Cid } } // update the expected document heads of replicator targets - for id := range s.nodeReplicatorSources[i] { - s.expectedDocHeads[id][evt.DocID] = evt.Cid + for id := range s.p2p.replicatorSources[i] { + s.p2p.expectedDocHeads[id][evt.DocID] = evt.Cid } // update the expected document heads of peer collection subs - for id := range s.nodePeerCollections[collectionID] { - s.expectedDocHeads[id][evt.DocID] = evt.Cid + for id := range s.p2p.peerCollections[collectionID] { + s.p2p.expectedDocHeads[id][evt.DocID] = evt.Cid } } } @@ -197,10 +198,10 @@ func waitForMergeEvents(s *state, action WaitForSync) { timeout = eventTimeout } - for nodeID, expect := range s.expectedDocHeads { + for nodeID, expect := range s.p2p.expectedDocHeads { // remove any docs that are already merged // up to the expected document head - for key, val := range s.actualDocHeads[nodeID] { + for key, val := range s.p2p.actualDocHeads[nodeID] { if head, ok := expect[key]; ok && head.String() == val.String() { delete(expect, key) } diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go index f1d20d2b1f..aeee489468 100644 --- a/tests/integration/p2p.go +++ b/tests/integration/p2p.go @@ -157,8 +157,8 @@ func connectPeers( log.InfoContext(s.ctx, "Bootstrapping with peers", corelog.Any("Addresses", addrs)) sourceNode.Bootstrap(addrs) - s.nodeConnections[cfg.SourceNodeID][cfg.TargetNodeID] = struct{}{} - s.nodeConnections[cfg.TargetNodeID][cfg.SourceNodeID] = struct{}{} + s.p2p.connections[cfg.SourceNodeID][cfg.TargetNodeID] = struct{}{} + s.p2p.connections[cfg.TargetNodeID][cfg.SourceNodeID] = struct{}{} // Bootstrap triggers a bunch of async stuff for which we have no good way of waiting on. It must be // allowed to complete before documentation begins or it will not even try and sync it. So for now, we diff --git a/tests/integration/state.go b/tests/integration/state.go index ee5ea35f12..c9847e5aa6 100644 --- a/tests/integration/state.go +++ b/tests/integration/state.go @@ -25,6 +25,39 @@ import ( "github.com/sourcenetwork/defradb/tests/clients" ) +// p2pState contains all p2p related testing test. +type p2pState struct { + // connections contains all connected nodes. + // + // The index of the slice is the node id. The map key is the connected node id. + connections []map[int]struct{} + + // replicatorSources is a mapping of replicator sources to targets. + // + // The index of the slice is the source node id. The map key is the target node id. + replicatorSources []map[int]struct{} + + // replicatorTargets is a mapping of replicator targets to sources. + // + // The index of the slice is the target node id. The map key is the source node id. + replicatorTargets []map[int]struct{} + + // peerCollections contains all active peer collection subscriptions. + // + // The index of the slice is the collection id. The map key is the node id of the subscriber. + peerCollections []map[int]struct{} + + // actualDocHeads contains all document heads that exist on a node. + // + // The index of the slice is the node id. The map key is the doc id. The map value is the doc head. + actualDocHeads []map[string]cid.Cid + + // expectedDocHeads contains all document heads that are expected to exist on a node. + // + // The index of the slice is the node id. The map key is the doc id. The map value is the doc head. + expectedDocHeads []map[string]cid.Cid +} + // eventState contains all event related testing state for a node. type eventState struct { // merge is the `event.MergeCompleteName` subscription @@ -98,36 +131,6 @@ type state struct { // nodeEvents contains all event node subscriptions. nodeEvents []*eventState - // nodeConnections contains all connected nodes. - // - // The index of the slice is the node id. The map key is the connected node id. - nodeConnections []map[int]struct{} - - // nodeReplicatorSources contains all active replicators. - // - // The index of the slice is the source node id. The map key is the target node id. - nodeReplicatorSources []map[int]struct{} - - // nodeReplicatorTargets contains all active replicators. - // - // The index of the slice is the target node id. The map key is the source node id. - nodeReplicatorTargets []map[int]struct{} - - // nodePeerCollections contains all active peer collection subscriptions. - // - // The index of the slice is the collection id. The map key is the node id of the subscriber. - nodePeerCollections []map[int]struct{} - - // actualDocHeads contains all document heads that exist on a node. - // - // The index of the slice is the node id. The map key is the doc id. The map value is the doc head. - actualDocHeads []map[string]cid.Cid - - // expectedDocHeads contains all document heads that are expected to exist on a node. - // - // The index of the slice is the node id. The map key is the doc id. The map value is the doc head. - expectedDocHeads []map[string]cid.Cid - // The addresses of any nodes configured. nodeAddresses []peer.AddrInfo @@ -137,6 +140,9 @@ type state struct { // The nodes active in this test. nodes []clients.Client + // The p2p test state. + p2p *p2pState + // The paths to any file-based databases active in this test. dbPaths []string @@ -179,16 +185,11 @@ func newState( txns: []datastore.Txn{}, allActionsDone: make(chan struct{}), subscriptionResultsChans: []chan func(){}, - nodeConnections: []map[int]struct{}{}, - nodeReplicatorSources: []map[int]struct{}{}, - nodeReplicatorTargets: []map[int]struct{}{}, - nodePeerCollections: []map[int]struct{}{}, nodeEvents: []*eventState{}, - actualDocHeads: []map[string]cid.Cid{}, - expectedDocHeads: []map[string]cid.Cid{}, nodeAddresses: []peer.AddrInfo{}, nodeConfigs: [][]net.NodeOpt{}, nodes: []clients.Client{}, + p2p: &p2pState{}, dbPaths: []string{}, collections: [][]client.Collection{}, collectionNames: collectionNames, diff --git a/tests/integration/utils2.go b/tests/integration/utils2.go index 78da73eea9..798f66b2b3 100644 --- a/tests/integration/utils2.go +++ b/tests/integration/utils2.go @@ -664,11 +664,11 @@ func setStartingNodes( s.nodeEvents = append(s.nodeEvents, eventState) s.dbPaths = append(s.dbPaths, path) - s.nodeConnections = append(s.nodeConnections, make(map[int]struct{})) - s.nodeReplicatorSources = append(s.nodeReplicatorSources, make(map[int]struct{})) - s.nodeReplicatorTargets = append(s.nodeReplicatorTargets, make(map[int]struct{})) - s.expectedDocHeads = append(s.expectedDocHeads, make(map[string]cid.Cid)) - s.actualDocHeads = append(s.actualDocHeads, make(map[string]cid.Cid)) + s.p2p.connections = append(s.p2p.connections, make(map[int]struct{})) + s.p2p.replicatorSources = append(s.p2p.replicatorSources, make(map[int]struct{})) + s.p2p.replicatorTargets = append(s.p2p.replicatorTargets, make(map[int]struct{})) + s.p2p.expectedDocHeads = append(s.p2p.expectedDocHeads, make(map[string]cid.Cid)) + s.p2p.actualDocHeads = append(s.p2p.actualDocHeads, make(map[string]cid.Cid)) } } @@ -746,8 +746,8 @@ func refreshCollections( ) { s.collections = make([][]client.Collection, len(s.nodes)) - for i := len(s.nodePeerCollections); i < len(s.collectionNames); i++ { - s.nodePeerCollections = append(s.nodePeerCollections, make(map[int]struct{})) + for i := len(s.p2p.peerCollections); i < len(s.collectionNames); i++ { + s.p2p.peerCollections = append(s.p2p.peerCollections, make(map[int]struct{})) } for nodeID, node := range s.nodes { @@ -813,11 +813,11 @@ func configureNode( s.nodeEvents = append(s.nodeEvents, eventState) s.dbPaths = append(s.dbPaths, path) - s.nodeConnections = append(s.nodeConnections, make(map[int]struct{})) - s.nodeReplicatorSources = append(s.nodeReplicatorSources, make(map[int]struct{})) - s.nodeReplicatorTargets = append(s.nodeReplicatorTargets, make(map[int]struct{})) - s.expectedDocHeads = append(s.expectedDocHeads, make(map[string]cid.Cid)) - s.actualDocHeads = append(s.actualDocHeads, make(map[string]cid.Cid)) + s.p2p.connections = append(s.p2p.connections, make(map[int]struct{})) + s.p2p.replicatorSources = append(s.p2p.replicatorSources, make(map[int]struct{})) + s.p2p.replicatorTargets = append(s.p2p.replicatorTargets, make(map[int]struct{})) + s.p2p.expectedDocHeads = append(s.p2p.expectedDocHeads, make(map[string]cid.Cid)) + s.p2p.actualDocHeads = append(s.p2p.actualDocHeads, make(map[string]cid.Cid)) } func refreshDocuments( From 4b88f6ba43af79d4f06c500947a9e7be0cc80593 Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Tue, 16 Jul 2024 13:39:54 -0700 Subject: [PATCH 07/14] simplify p2p test state --- tests/integration/events.go | 64 ++++++++++++++++++++++--------------- tests/integration/p2p.go | 4 +-- tests/integration/state.go | 42 +++++++++++++++--------- tests/integration/utils2.go | 19 ++--------- 4 files changed, 69 insertions(+), 60 deletions(-) diff --git a/tests/integration/events.go b/tests/integration/events.go index add4386eba..db5e5e4dcb 100644 --- a/tests/integration/events.go +++ b/tests/integration/events.go @@ -68,11 +68,11 @@ func waitForReplicatorConfigureEvent(s *state, cfg ConfigureReplicator) { } // all previous documents should be merged on the subscriber node - for key, val := range s.p2p.actualDocHeads[cfg.SourceNodeID] { - s.p2p.expectedDocHeads[cfg.TargetNodeID][key] = val + for key, val := range s.nodeP2P[cfg.SourceNodeID].actualDocHeads { + s.nodeP2P[cfg.TargetNodeID].expectedDocHeads[key] = val } - s.p2p.replicatorTargets[cfg.TargetNodeID][cfg.SourceNodeID] = struct{}{} - s.p2p.replicatorSources[cfg.SourceNodeID][cfg.TargetNodeID] = struct{}{} + s.nodeP2P[cfg.TargetNodeID].replicatorTargets[cfg.SourceNodeID] = struct{}{} + s.nodeP2P[cfg.SourceNodeID].replicatorSources[cfg.TargetNodeID] = struct{}{} } // waitForReplicatorConfigureEvent waits for a node to publish a @@ -86,8 +86,8 @@ func waitForReplicatorDeleteEvent(s *state, cfg DeleteReplicator) { require.Fail(s.t, "timeout waiting for replicator event") } - delete(s.p2p.replicatorTargets[cfg.TargetNodeID], cfg.SourceNodeID) - delete(s.p2p.replicatorSources[cfg.SourceNodeID], cfg.TargetNodeID) + delete(s.nodeP2P[cfg.TargetNodeID].replicatorTargets, cfg.SourceNodeID) + delete(s.nodeP2P[cfg.SourceNodeID].replicatorSources, cfg.TargetNodeID) } // waitForSubscribeToCollectionEvent waits for a node to publish a @@ -108,16 +108,13 @@ func waitForSubscribeToCollectionEvent(s *state, action SubscribeToCollection) { if collectionIndex == NonExistentCollectionID { continue // don't track non existent collections } - s.p2p.peerCollections[collectionIndex][action.NodeID] = struct{}{} + s.nodeP2P[action.NodeID].peerCollections[collectionIndex] = struct{}{} if collectionIndex >= len(s.documents) { continue // no documents to track } // all previous documents should be merged on the subscriber node for _, doc := range s.documents[collectionIndex] { - for nodeID := range s.p2p.connections[action.NodeID] { - head := s.p2p.actualDocHeads[nodeID][doc.ID().String()] - s.p2p.expectedDocHeads[action.NodeID][doc.ID().String()] = head - } + s.nodeP2P[action.NodeID].expectedDocHeads[doc.ID().String()] = doc.Head() } } } @@ -137,7 +134,7 @@ func waitForUnsubscribeToCollectionEvent(s *state, action UnsubscribeToCollectio if collectionIndex == NonExistentCollectionID { continue // don't track non existent collections } - delete(s.p2p.peerCollections[collectionIndex], action.NodeID) + delete(s.nodeP2P[action.NodeID].peerCollections, collectionIndex) } } @@ -160,28 +157,41 @@ func waitForUpdateEvents(s *state, nodeID immutable.Option[int], collectionID in require.Fail(s.t, "timeout waiting for update event") } - // update the actual document heads - s.p2p.actualDocHeads[i][evt.DocID] = evt.Cid + // update the actual document head on the node that updated it + s.nodeP2P[i].actualDocHeads[evt.DocID] = evt.Cid // update the expected document heads of connected nodes - for id := range s.p2p.connections[i] { - if _, ok := s.p2p.actualDocHeads[id][evt.DocID]; ok { - s.p2p.expectedDocHeads[id][evt.DocID] = evt.Cid + // + // connected nodes share updates of documents they have in common + for id := range s.nodeP2P[i].connections { + if _, ok := s.nodeP2P[id].actualDocHeads[evt.DocID]; ok { + s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid } } + // update the expected document heads of replicator sources - for id := range s.p2p.replicatorTargets[i] { - if _, ok := s.p2p.actualDocHeads[id][evt.DocID]; ok { - s.p2p.expectedDocHeads[id][evt.DocID] = evt.Cid + // + // replicator source nodes receive updates from target nodes + for id := range s.nodeP2P[i].replicatorTargets { + if _, ok := s.nodeP2P[id].actualDocHeads[evt.DocID]; ok { + s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid } } + // update the expected document heads of replicator targets - for id := range s.p2p.replicatorSources[i] { - s.p2p.expectedDocHeads[id][evt.DocID] = evt.Cid + // + // replicator target nodes push updates to source nodes + for id := range s.nodeP2P[i].replicatorSources { + s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid } + // update the expected document heads of peer collection subs - for id := range s.p2p.peerCollections[collectionID] { - s.p2p.expectedDocHeads[id][evt.DocID] = evt.Cid + // + // peer collection subscribers receive updates from any other subscriber node + for id := range s.nodes { + if _, ok := s.nodeP2P[id].peerCollections[collectionID]; ok { + s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid + } } } } @@ -198,10 +208,12 @@ func waitForMergeEvents(s *state, action WaitForSync) { timeout = eventTimeout } - for nodeID, expect := range s.p2p.expectedDocHeads { + for nodeID := 0; nodeID < len(s.nodes); nodeID++ { + expect := s.nodeP2P[nodeID].expectedDocHeads + // remove any docs that are already merged // up to the expected document head - for key, val := range s.p2p.actualDocHeads[nodeID] { + for key, val := range s.nodeP2P[nodeID].actualDocHeads { if head, ok := expect[key]; ok && head.String() == val.String() { delete(expect, key) } diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go index aeee489468..ae34892c04 100644 --- a/tests/integration/p2p.go +++ b/tests/integration/p2p.go @@ -157,8 +157,8 @@ func connectPeers( log.InfoContext(s.ctx, "Bootstrapping with peers", corelog.Any("Addresses", addrs)) sourceNode.Bootstrap(addrs) - s.p2p.connections[cfg.SourceNodeID][cfg.TargetNodeID] = struct{}{} - s.p2p.connections[cfg.TargetNodeID][cfg.SourceNodeID] = struct{}{} + s.nodeP2P[cfg.SourceNodeID].connections[cfg.TargetNodeID] = struct{}{} + s.nodeP2P[cfg.TargetNodeID].connections[cfg.SourceNodeID] = struct{}{} // Bootstrap triggers a bunch of async stuff for which we have no good way of waiting on. It must be // allowed to complete before documentation begins or it will not even try and sync it. So for now, we diff --git a/tests/integration/state.go b/tests/integration/state.go index c9847e5aa6..a51ed1dd9b 100644 --- a/tests/integration/state.go +++ b/tests/integration/state.go @@ -29,33 +29,45 @@ import ( type p2pState struct { // connections contains all connected nodes. // - // The index of the slice is the node id. The map key is the connected node id. - connections []map[int]struct{} + // The map key is the connected node id. + connections map[int]struct{} // replicatorSources is a mapping of replicator sources to targets. // - // The index of the slice is the source node id. The map key is the target node id. - replicatorSources []map[int]struct{} + // The map key is the target node id. + replicatorSources map[int]struct{} // replicatorTargets is a mapping of replicator targets to sources. // - // The index of the slice is the target node id. The map key is the source node id. - replicatorTargets []map[int]struct{} + // The map key is the source node id. + replicatorTargets map[int]struct{} // peerCollections contains all active peer collection subscriptions. // - // The index of the slice is the collection id. The map key is the node id of the subscriber. - peerCollections []map[int]struct{} + // The map key is the node id of the subscriber. + peerCollections map[int]struct{} // actualDocHeads contains all document heads that exist on a node. // - // The index of the slice is the node id. The map key is the doc id. The map value is the doc head. - actualDocHeads []map[string]cid.Cid + // The map key is the doc id. The map value is the doc head. + actualDocHeads map[string]cid.Cid // expectedDocHeads contains all document heads that are expected to exist on a node. // - // The index of the slice is the node id. The map key is the doc id. The map value is the doc head. - expectedDocHeads []map[string]cid.Cid + // The map key is the doc id. The map value is the doc head. + expectedDocHeads map[string]cid.Cid +} + +// newP2PState returns a new empty p2p state. +func newP2PState() *p2pState { + return &p2pState{ + connections: make(map[int]struct{}), + replicatorSources: make(map[int]struct{}), + replicatorTargets: make(map[int]struct{}), + peerCollections: make(map[int]struct{}), + actualDocHeads: make(map[string]cid.Cid), + expectedDocHeads: make(map[string]cid.Cid), + } } // eventState contains all event related testing state for a node. @@ -140,8 +152,8 @@ type state struct { // The nodes active in this test. nodes []clients.Client - // The p2p test state. - p2p *p2pState + // nodeP2P contains p2p states for all nodes + nodeP2P []*p2pState // The paths to any file-based databases active in this test. dbPaths []string @@ -188,8 +200,8 @@ func newState( nodeEvents: []*eventState{}, nodeAddresses: []peer.AddrInfo{}, nodeConfigs: [][]net.NodeOpt{}, + nodeP2P: []*p2pState{}, nodes: []clients.Client{}, - p2p: &p2pState{}, dbPaths: []string{}, collections: [][]client.Collection{}, collectionNames: collectionNames, diff --git a/tests/integration/utils2.go b/tests/integration/utils2.go index 798f66b2b3..9695371425 100644 --- a/tests/integration/utils2.go +++ b/tests/integration/utils2.go @@ -23,7 +23,6 @@ import ( "github.com/bxcodec/faker/support/slice" "github.com/fxamacker/cbor/v2" - "github.com/ipfs/go-cid" "github.com/sourcenetwork/corelog" "github.com/sourcenetwork/immutable" "github.com/stretchr/testify/assert" @@ -662,13 +661,8 @@ func setStartingNodes( s.nodes = append(s.nodes, c) s.nodeEvents = append(s.nodeEvents, eventState) + s.nodeP2P = append(s.nodeP2P, newP2PState()) s.dbPaths = append(s.dbPaths, path) - - s.p2p.connections = append(s.p2p.connections, make(map[int]struct{})) - s.p2p.replicatorSources = append(s.p2p.replicatorSources, make(map[int]struct{})) - s.p2p.replicatorTargets = append(s.p2p.replicatorTargets, make(map[int]struct{})) - s.p2p.expectedDocHeads = append(s.p2p.expectedDocHeads, make(map[string]cid.Cid)) - s.p2p.actualDocHeads = append(s.p2p.actualDocHeads, make(map[string]cid.Cid)) } } @@ -746,10 +740,6 @@ func refreshCollections( ) { s.collections = make([][]client.Collection, len(s.nodes)) - for i := len(s.p2p.peerCollections); i < len(s.collectionNames); i++ { - s.p2p.peerCollections = append(s.p2p.peerCollections, make(map[int]struct{})) - } - for nodeID, node := range s.nodes { s.collections[nodeID] = make([]client.Collection, len(s.collectionNames)) allCollections, err := node.GetCollections(s.ctx, client.CollectionFetchOptions{}) @@ -811,13 +801,8 @@ func configureNode( s.nodes = append(s.nodes, c) s.nodeEvents = append(s.nodeEvents, eventState) + s.nodeP2P = append(s.nodeP2P, newP2PState()) s.dbPaths = append(s.dbPaths, path) - - s.p2p.connections = append(s.p2p.connections, make(map[int]struct{})) - s.p2p.replicatorSources = append(s.p2p.replicatorSources, make(map[int]struct{})) - s.p2p.replicatorTargets = append(s.p2p.replicatorTargets, make(map[int]struct{})) - s.p2p.expectedDocHeads = append(s.p2p.expectedDocHeads, make(map[string]cid.Cid)) - s.p2p.actualDocHeads = append(s.p2p.actualDocHeads, make(map[string]cid.Cid)) } func refreshDocuments( From 6e136a1f6920e228a48e4c3fa9e61dff981a4c89 Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Tue, 16 Jul 2024 13:54:34 -0700 Subject: [PATCH 08/14] make peer connections deterministic. remove p2p sleeps --- net/peer.go | 5 +++++ tests/clients/cli/wrapper.go | 4 ++-- tests/clients/clients.go | 4 +++- tests/clients/http/wrapper.go | 4 ++-- tests/integration/client.go | 6 ++++-- tests/integration/events.go | 2 +- tests/integration/p2p.go | 29 ++++++----------------------- 7 files changed, 23 insertions(+), 31 deletions(-) diff --git a/net/peer.go b/net/peer.go index 00ea8653a0..768a255fc3 100644 --- a/net/peer.go +++ b/net/peer.go @@ -504,6 +504,11 @@ func stopGRPCServer(ctx context.Context, server *grpc.Server) { } } +// Connect initiates a connection to the peer with the given address. +func (p *Peer) Connect(ctx context.Context, addr peer.AddrInfo) error { + return p.host.Connect(ctx, addr) +} + // Bootstrap connects to the given peers. func (p *Peer) Bootstrap(addrs []peer.AddrInfo) { var connected uint64 diff --git a/tests/clients/cli/wrapper.go b/tests/clients/cli/wrapper.go index a6f3feef4c..d4976420b5 100644 --- a/tests/clients/cli/wrapper.go +++ b/tests/clients/cli/wrapper.go @@ -537,6 +537,6 @@ func (w *Wrapper) PrintDump(ctx context.Context) error { return w.node.DB.PrintDump(ctx) } -func (w *Wrapper) Bootstrap(addrs []peer.AddrInfo) { - w.node.Peer.Bootstrap(addrs) +func (w *Wrapper) Connect(ctx context.Context, addr peer.AddrInfo) error { + return w.node.Peer.Connect(ctx, addr) } diff --git a/tests/clients/clients.go b/tests/clients/clients.go index f5d822ab39..2a67ed0812 100644 --- a/tests/clients/clients.go +++ b/tests/clients/clients.go @@ -11,6 +11,8 @@ package clients import ( + "context" + "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcenetwork/defradb/client" @@ -20,5 +22,5 @@ import ( // required for testing. type Client interface { client.DB - Bootstrap([]peer.AddrInfo) + Connect(context.Context, peer.AddrInfo) error } diff --git a/tests/clients/http/wrapper.go b/tests/clients/http/wrapper.go index f183a4671a..7f45671b57 100644 --- a/tests/clients/http/wrapper.go +++ b/tests/clients/http/wrapper.go @@ -233,6 +233,6 @@ func (w *Wrapper) PrintDump(ctx context.Context) error { return w.node.DB.PrintDump(ctx) } -func (w *Wrapper) Bootstrap(addrs []peer.AddrInfo) { - w.node.Peer.Bootstrap(addrs) +func (w *Wrapper) Connect(ctx context.Context, addr peer.AddrInfo) error { + return w.node.Peer.Connect(ctx, addr) } diff --git a/tests/integration/client.go b/tests/integration/client.go index dee5a3f0c9..aae14080a2 100644 --- a/tests/integration/client.go +++ b/tests/integration/client.go @@ -11,6 +11,7 @@ package tests import ( + "context" "fmt" "os" "strconv" @@ -100,10 +101,11 @@ func newGoClientWrapper(n *node.Node) *goClientWrapper { } } -func (w *goClientWrapper) Bootstrap(addrs []peer.AddrInfo) { +func (w *goClientWrapper) Connect(ctx context.Context, addr peer.AddrInfo) error { if w.peer != nil { - w.peer.Bootstrap(addrs) + return w.peer.Connect(ctx, addr) } + return nil } func (w *goClientWrapper) Close() { diff --git a/tests/integration/events.go b/tests/integration/events.go index db5e5e4dcb..d1d305c234 100644 --- a/tests/integration/events.go +++ b/tests/integration/events.go @@ -21,7 +21,7 @@ import ( // eventTimeout is the default amount of time // to wait for an event before timing out -const eventTimeout = 1 * time.Second +const eventTimeout = 5 * time.Second // waitForNetworkSetupEvents waits for p2p topic completed and // replicator completed events to be published on the local node event bus. diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go index ae34892c04..38b56019d0 100644 --- a/tests/integration/p2p.go +++ b/tests/integration/p2p.go @@ -147,23 +147,19 @@ func connectPeers( s *state, cfg ConnectPeers, ) { - // If we have some database actions prior to connecting the peers, we want to ensure that they had time to - // complete before we connect. Otherwise we might wrongly catch them in our wait function. - time.Sleep(100 * time.Millisecond) sourceNode := s.nodes[cfg.SourceNodeID] targetNode := s.nodes[cfg.TargetNodeID] addrs := []peer.AddrInfo{targetNode.PeerInfo()} - log.InfoContext(s.ctx, "Bootstrapping with peers", corelog.Any("Addresses", addrs)) - sourceNode.Bootstrap(addrs) + log.InfoContext(s.ctx, "Connecting to peers", corelog.Any("Addresses", addrs)) + + for _, addr := range addrs { + err := sourceNode.Connect(s.ctx, addr) + require.NoError(s.t, err) + } s.nodeP2P[cfg.SourceNodeID].connections[cfg.TargetNodeID] = struct{}{} s.nodeP2P[cfg.TargetNodeID].connections[cfg.SourceNodeID] = struct{}{} - - // Bootstrap triggers a bunch of async stuff for which we have no good way of waiting on. It must be - // allowed to complete before documentation begins or it will not even try and sync it. So for now, we - // sleep a little. - time.Sleep(100 * time.Millisecond) } // configureReplicator configures a replicator relationship between two existing, started, nodes. @@ -175,9 +171,6 @@ func configureReplicator( s *state, cfg ConfigureReplicator, ) { - // If we have some database actions prior to configuring the replicator, we want to ensure that they had time to - // complete before the configuration. Otherwise we might wrongly catch them in our wait function. - time.Sleep(100 * time.Millisecond) sourceNode := s.nodes[cfg.SourceNodeID] targetNode := s.nodes[cfg.TargetNodeID] @@ -234,11 +227,6 @@ func subscribeToCollection( expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError) assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) - - // The `n.Peer.AddP2PCollections(colIDs)` call above is calling some asynchronous functions - // for the pubsub subscription and those functions can take a bit of time to complete, - // we need to make sure this has finished before progressing. - time.Sleep(100 * time.Millisecond) } // unsubscribeToCollection removes the given collections from subscriptions on the given nodes. @@ -268,11 +256,6 @@ func unsubscribeToCollection( expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError) assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) - - // The `n.Peer.RemoveP2PCollections(colIDs)` call above is calling some asynchronous functions - // for the pubsub subscription and those functions can take a bit of time to complete, - // we need to make sure this has finished before progressing. - time.Sleep(100 * time.Millisecond) } // getAllP2PCollections gets all the active peer subscriptions and compares them against the From 25ee3a483ba6d5e01ed64a558856125ca1eb9f0a Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Tue, 16 Jul 2024 22:12:19 -0700 Subject: [PATCH 09/14] handle closed subscriptions --- tests/integration/events.go | 114 ++++++++++++++++++------------------ tests/integration/p2p.go | 18 +++--- tests/integration/state.go | 20 +++---- tests/integration/utils2.go | 6 +- 4 files changed, 78 insertions(+), 80 deletions(-) diff --git a/tests/integration/events.go b/tests/integration/events.go index d1d305c234..7f7b29a7b3 100644 --- a/tests/integration/events.go +++ b/tests/integration/events.go @@ -11,6 +11,7 @@ package tests import ( + "context" "time" "github.com/sourcenetwork/immutable" @@ -19,9 +20,9 @@ import ( "github.com/sourcenetwork/defradb/event" ) -// eventTimeout is the default amount of time -// to wait for an event before timing out -const eventTimeout = 5 * time.Second +// eventTimeout is the amount of time to wait +// for an event before timing out +const eventTimeout = 1 * time.Second // waitForNetworkSetupEvents waits for p2p topic completed and // replicator completed events to be published on the local node event bus. @@ -42,10 +43,16 @@ func waitForNetworkSetupEvents(s *state, nodeID int) { for p2pTopicEvents > 0 && replicatorEvents > 0 { select { - case <-s.nodeEvents[nodeID].replicator.Message(): + case _, ok := <-s.nodeEvents[nodeID].replicator.Message(): + if !ok { + require.Fail(s.t, "subscription closed waiting for network setup events") + } replicatorEvents-- - case <-s.nodeEvents[nodeID].p2pTopic.Message(): + case _, ok := <-s.nodeEvents[nodeID].p2pTopic.Message(): + if !ok { + require.Fail(s.t, "subscription closed waiting for network setup events") + } p2pTopicEvents-- case <-time.After(eventTimeout): @@ -60,8 +67,10 @@ func waitForNetworkSetupEvents(s *state, nodeID int) { // Expected document heads will be updated for the targeted node. func waitForReplicatorConfigureEvent(s *state, cfg ConfigureReplicator) { select { - case <-s.nodeEvents[cfg.SourceNodeID].replicator.Message(): - // event recieved + case _, ok := <-s.nodeEvents[cfg.SourceNodeID].replicator.Message(): + if !ok { + require.Fail(s.t, "subscription closed waiting for replicator event") + } case <-time.After(eventTimeout): require.Fail(s.t, "timeout waiting for replicator event") @@ -71,23 +80,27 @@ func waitForReplicatorConfigureEvent(s *state, cfg ConfigureReplicator) { for key, val := range s.nodeP2P[cfg.SourceNodeID].actualDocHeads { s.nodeP2P[cfg.TargetNodeID].expectedDocHeads[key] = val } - s.nodeP2P[cfg.TargetNodeID].replicatorTargets[cfg.SourceNodeID] = struct{}{} - s.nodeP2P[cfg.SourceNodeID].replicatorSources[cfg.TargetNodeID] = struct{}{} + + // update node connections and replicators + s.nodeP2P[cfg.TargetNodeID].connections[cfg.SourceNodeID] = struct{}{} + s.nodeP2P[cfg.SourceNodeID].connections[cfg.TargetNodeID] = struct{}{} + s.nodeP2P[cfg.SourceNodeID].replicators[cfg.TargetNodeID] = struct{}{} } // waitForReplicatorConfigureEvent waits for a node to publish a // replicator completed event on the local event bus. func waitForReplicatorDeleteEvent(s *state, cfg DeleteReplicator) { select { - case <-s.nodeEvents[cfg.SourceNodeID].replicator.Message(): - // event recieved + case _, ok := <-s.nodeEvents[cfg.SourceNodeID].replicator.Message(): + if !ok { + require.Fail(s.t, "subscription closed waiting for replicator event") + } case <-time.After(eventTimeout): require.Fail(s.t, "timeout waiting for replicator event") } - delete(s.nodeP2P[cfg.TargetNodeID].replicatorTargets, cfg.SourceNodeID) - delete(s.nodeP2P[cfg.SourceNodeID].replicatorSources, cfg.TargetNodeID) + delete(s.nodeP2P[cfg.SourceNodeID].replicators, cfg.TargetNodeID) } // waitForSubscribeToCollectionEvent waits for a node to publish a @@ -96,26 +109,21 @@ func waitForReplicatorDeleteEvent(s *state, cfg DeleteReplicator) { // Expected document heads will be updated for the subscriber node. func waitForSubscribeToCollectionEvent(s *state, action SubscribeToCollection) { select { - case <-s.nodeEvents[action.NodeID].p2pTopic.Message(): - // event recieved + case _, ok := <-s.nodeEvents[action.NodeID].p2pTopic.Message(): + if !ok { + require.Fail(s.t, "subscription closed waiting for p2p topic event") + } case <-time.After(eventTimeout): require.Fail(s.t, "timeout waiting for p2p topic event") } - // update peer collections and expected documents of subscribed node + // update peer collections of target node for _, collectionIndex := range action.CollectionIDs { if collectionIndex == NonExistentCollectionID { continue // don't track non existent collections } s.nodeP2P[action.NodeID].peerCollections[collectionIndex] = struct{}{} - if collectionIndex >= len(s.documents) { - continue // no documents to track - } - // all previous documents should be merged on the subscriber node - for _, doc := range s.documents[collectionIndex] { - s.nodeP2P[action.NodeID].expectedDocHeads[doc.ID().String()] = doc.Head() - } } } @@ -123,8 +131,10 @@ func waitForSubscribeToCollectionEvent(s *state, action SubscribeToCollection) { // p2p topic completed event on the local event bus. func waitForUnsubscribeToCollectionEvent(s *state, action UnsubscribeToCollection) { select { - case <-s.nodeEvents[action.NodeID].p2pTopic.Message(): - // event recieved + case _, ok := <-s.nodeEvents[action.NodeID].p2pTopic.Message(): + if !ok { + require.Fail(s.t, "subscription closed waiting for p2p topic event") + } case <-time.After(eventTimeout): require.Fail(s.t, "timeout waiting for p2p topic event") @@ -150,7 +160,10 @@ func waitForUpdateEvents(s *state, nodeID immutable.Option[int], collectionID in var evt event.Update select { - case msg := <-s.nodeEvents[i].update.Message(): + case msg, ok := <-s.nodeEvents[i].update.Message(): + if !ok { + require.Fail(s.t, "subscription closed waiting for update event") + } evt = msg.Data.(event.Update) case <-time.After(eventTimeout): @@ -160,35 +173,19 @@ func waitForUpdateEvents(s *state, nodeID immutable.Option[int], collectionID in // update the actual document head on the node that updated it s.nodeP2P[i].actualDocHeads[evt.DocID] = evt.Cid - // update the expected document heads of connected nodes - // - // connected nodes share updates of documents they have in common - for id := range s.nodeP2P[i].connections { - if _, ok := s.nodeP2P[id].actualDocHeads[evt.DocID]; ok { - s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid - } + // update the expected document heads of replicator targets + for id := range s.nodeP2P[i].replicators { + // replicator target nodes push updates to source nodes + s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid } - // update the expected document heads of replicator sources - // - // replicator source nodes receive updates from target nodes - for id := range s.nodeP2P[i].replicatorTargets { + // update the expected document heads of connected nodes + for id := range s.nodeP2P[i].connections { + // connected nodes share updates of documents they have in common if _, ok := s.nodeP2P[id].actualDocHeads[evt.DocID]; ok { s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid } - } - - // update the expected document heads of replicator targets - // - // replicator target nodes push updates to source nodes - for id := range s.nodeP2P[i].replicatorSources { - s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid - } - - // update the expected document heads of peer collection subs - // - // peer collection subscribers receive updates from any other subscriber node - for id := range s.nodes { + // peer collection subscribers receive updates from any other subscriber node if _, ok := s.nodeP2P[id].peerCollections[collectionID]; ok { s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid } @@ -201,12 +198,9 @@ func waitForUpdateEvents(s *state, nodeID immutable.Option[int], collectionID in // Will fail the test if an event is not received within the expected time interval to prevent tests // from running forever. func waitForMergeEvents(s *state, action WaitForSync) { - var timeout time.Duration - if action.ExpectedTimeout != 0 { - timeout = action.ExpectedTimeout - } else { - timeout = eventTimeout - } + // use a longer timeout since the sync process can take a while + ctx, cancel := context.WithTimeout(s.ctx, 60*time.Second) + defer cancel() for nodeID := 0; nodeID < len(s.nodes); nodeID++ { expect := s.nodeP2P[nodeID].expectedDocHeads @@ -218,6 +212,7 @@ func waitForMergeEvents(s *state, action WaitForSync) { delete(expect, key) } } + // wait for all expected doc heads to be merged // // the order of merges does not matter as we only @@ -227,10 +222,13 @@ func waitForMergeEvents(s *state, action WaitForSync) { for len(expect) > 0 { var evt event.Merge select { - case msg := <-s.nodeEvents[nodeID].merge.Message(): + case msg, ok := <-s.nodeEvents[nodeID].merge.Message(): + if !ok { + require.Fail(s.t, "subscription closed waiting for merge complete event") + } evt = msg.Data.(event.Merge) - case <-time.After(timeout): + case <-ctx.Done(): require.Fail(s.t, "timeout waiting for merge complete event") } diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go index 38b56019d0..91ea92cc26 100644 --- a/tests/integration/p2p.go +++ b/tests/integration/p2p.go @@ -16,7 +16,6 @@ import ( "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/net" - "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcenetwork/corelog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -150,13 +149,18 @@ func connectPeers( sourceNode := s.nodes[cfg.SourceNodeID] targetNode := s.nodes[cfg.TargetNodeID] - addrs := []peer.AddrInfo{targetNode.PeerInfo()} - log.InfoContext(s.ctx, "Connecting to peers", corelog.Any("Addresses", addrs)) + sourceAddr := sourceNode.PeerInfo() + targetAddr := targetNode.PeerInfo() - for _, addr := range addrs { - err := sourceNode.Connect(s.ctx, addr) - require.NoError(s.t, err) - } + log.InfoContext(s.ctx, "Connecting to peers", + corelog.Any("Source", sourceAddr), + corelog.Any("Target", targetAddr)) + + err := targetNode.Connect(s.ctx, sourceAddr) + require.NoError(s.t, err) + + err = sourceNode.Connect(s.ctx, targetAddr) + require.NoError(s.t, err) s.nodeP2P[cfg.SourceNodeID].connections[cfg.TargetNodeID] = struct{}{} s.nodeP2P[cfg.TargetNodeID].connections[cfg.SourceNodeID] = struct{}{} diff --git a/tests/integration/state.go b/tests/integration/state.go index a51ed1dd9b..888136fce3 100644 --- a/tests/integration/state.go +++ b/tests/integration/state.go @@ -32,15 +32,10 @@ type p2pState struct { // The map key is the connected node id. connections map[int]struct{} - // replicatorSources is a mapping of replicator sources to targets. - // - // The map key is the target node id. - replicatorSources map[int]struct{} - - // replicatorTargets is a mapping of replicator targets to sources. + // replicators is a mapping of replicator targets. // // The map key is the source node id. - replicatorTargets map[int]struct{} + replicators map[int]struct{} // peerCollections contains all active peer collection subscriptions. // @@ -61,12 +56,11 @@ type p2pState struct { // newP2PState returns a new empty p2p state. func newP2PState() *p2pState { return &p2pState{ - connections: make(map[int]struct{}), - replicatorSources: make(map[int]struct{}), - replicatorTargets: make(map[int]struct{}), - peerCollections: make(map[int]struct{}), - actualDocHeads: make(map[string]cid.Cid), - expectedDocHeads: make(map[string]cid.Cid), + connections: make(map[int]struct{}), + replicators: make(map[int]struct{}), + peerCollections: make(map[int]struct{}), + actualDocHeads: make(map[string]cid.Cid), + expectedDocHeads: make(map[string]cid.Cid), } } diff --git a/tests/integration/utils2.go b/tests/integration/utils2.go index 9695371425..1395923655 100644 --- a/tests/integration/utils2.go +++ b/tests/integration/utils2.go @@ -720,7 +720,7 @@ func restartNodes( eventState, err := newEventState(c.Events()) require.NoError(s.t, err) - s.nodeEvents = append(s.nodeEvents, eventState) + s.nodeEvents[i] = eventState waitForNetworkSetupEvents(s, i) } @@ -1182,7 +1182,9 @@ func createDoc( s.documents[action.CollectionID] = append(s.documents[action.CollectionID], docs...) if action.ExpectedError == "" { - waitForUpdateEvents(s, action.NodeID, action.CollectionID) + for i := 0; i < len(docs); i++ { + waitForUpdateEvents(s, action.NodeID, action.CollectionID) + } } } From 301445cb0e21caecc0bd7a2dd06535ac7a8015c1 Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Wed, 17 Jul 2024 13:08:46 -0700 Subject: [PATCH 10/14] add required sleeps to p2p test actions --- net/server.go | 75 ++++++++++++++++++++++++++--------- tests/clients/cli/wrapper.go | 4 +- tests/clients/clients.go | 4 +- tests/clients/http/wrapper.go | 4 +- tests/integration/client.go | 6 +-- tests/integration/events.go | 29 +++++++------- tests/integration/p2p.go | 31 +++++++++------ tests/integration/state.go | 2 +- 8 files changed, 99 insertions(+), 56 deletions(-) diff --git a/net/server.go b/net/server.go index 3b4922fe5e..0e36eb7e3c 100644 --- a/net/server.go +++ b/net/server.go @@ -125,18 +125,24 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL if err != nil { return nil, err } + + log.InfoContext(ctx, "Received pushlog", + corelog.Any("PeerID", pid.String()), + corelog.Any("Creator", byPeer.String()), + corelog.Any("DocID", docID.String())) + + log.InfoContext(ctx, "Starting DAG sync", + corelog.Any("PeerID", pid.String()), + corelog.Any("DocID", docID.String())) + err = syncDAG(ctx, s.peer.bserv, block) if err != nil { return nil, err } - s.peer.bus.Publish(event.NewMessage(event.MergeName, event.Merge{ - DocID: docID.String(), - ByPeer: byPeer, - FromPeer: pid, - Cid: headCID, - SchemaRoot: string(req.Body.SchemaRoot), - })) + log.InfoContext(ctx, "DAG sync complete", + corelog.Any("PeerID", pid.String()), + corelog.Any("DocID", docID.String())) // Once processed, subscribe to the DocID topic on the pubsub network unless we already // suscribe to the collection. @@ -146,6 +152,15 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL return nil, err } } + + s.peer.bus.Publish(event.NewMessage(event.MergeName, event.Merge{ + DocID: docID.String(), + ByPeer: byPeer, + FromPeer: pid, + Cid: headCID, + SchemaRoot: string(req.Body.SchemaRoot), + })) + return &pb.PushLogReply{}, nil } @@ -163,6 +178,10 @@ func (s *server) addPubSubTopic(topic string, subscribe bool) error { return nil } + log.InfoContext(s.peer.ctx, "Adding pubsub topic", + corelog.String("PeerID", s.peer.PeerID().String()), + corelog.String("Topic", topic)) + s.mu.Lock() defer s.mu.Unlock() if t, ok := s.topics[topic]; ok { @@ -205,6 +224,10 @@ func (s *server) removePubSubTopic(topic string) error { return nil } + log.InfoContext(s.peer.ctx, "Removing pubsub topic", + corelog.String("PeerID", s.peer.PeerID().String()), + corelog.String("Topic", topic)) + s.mu.Lock() defer s.mu.Unlock() if t, ok := s.topics[topic]; ok { @@ -218,6 +241,10 @@ func (s *server) removeAllPubsubTopics() error { if s.peer.ps == nil { return nil } + + log.InfoContext(s.peer.ctx, "Removing all pubsub topics", + corelog.String("PeerID", s.peer.PeerID().String())) + s.mu.Lock() defer s.mu.Unlock() for id, t := range s.topics { @@ -232,6 +259,10 @@ func (s *server) removeAllPubsubTopics() error { // publishLog publishes the given PushLogRequest object on the PubSub network via the // corresponding topic func (s *server) publishLog(ctx context.Context, topic string, req *pb.PushLogRequest) error { + log.InfoContext(ctx, "Publish log", + corelog.String("PeerID", s.peer.PeerID().String()), + corelog.String("Topic", topic)) + if s.peer.ps == nil { // skip if we aren't running with a pubsub net return nil } @@ -259,12 +290,16 @@ func (s *server) publishLog(ctx context.Context, topic string, req *pb.PushLogRe // pubSubMessageHandler handles incoming PushLog messages from the pubsub network. func (s *server) pubSubMessageHandler(from libpeer.ID, topic string, msg []byte) ([]byte, error) { + log.InfoContext(s.peer.ctx, "Received new pubsub event", + corelog.String("PeerID", s.peer.PeerID().String()), + corelog.Any("SenderId", from), + corelog.String("Topic", topic)) + req := new(pb.PushLogRequest) if err := proto.Unmarshal(msg, req); err != nil { log.ErrorContextE(s.peer.ctx, "Failed to unmarshal pubsub message %s", err) return nil, err } - ctx := grpcpeer.NewContext(s.peer.ctx, &grpcpeer.Peer{ Addr: addr{from}, }) @@ -276,9 +311,8 @@ func (s *server) pubSubMessageHandler(from libpeer.ID, topic string, msg []byte) // pubSubEventHandler logs events from the subscribed DocID topics. func (s *server) pubSubEventHandler(from libpeer.ID, topic string, msg []byte) { - log.InfoContext( - s.peer.ctx, - "Received new pubsub event", + log.InfoContext(s.peer.ctx, "Received new pubsub event", + corelog.String("PeerID", s.peer.PeerID().String()), corelog.Any("SenderId", from), corelog.String("Topic", topic), corelog.String("Message", string(msg)), @@ -329,7 +363,18 @@ func (s *server) updatePubSubTopics(evt event.P2PTopic) { } func (s *server) updateReplicators(evt event.Replicator) { - isDeleteRep := len(evt.Schemas) == 0 + if len(evt.Schemas) == 0 { + // remove peer from store + s.peer.host.Peerstore().ClearAddrs(evt.Info.ID) + } else { + // add peer to store + s.peer.host.Peerstore().AddAddrs(evt.Info.ID, evt.Info.Addrs, peerstore.PermanentAddrTTL) + // connect to the peer + if err := s.peer.Connect(s.peer.ctx, evt.Info); err != nil { + log.ErrorContextE(s.peer.ctx, "Failed to connect to replicator peer", err) + } + } + // update the cached replicators s.mu.Lock() for schema, peers := range s.replicators { @@ -350,12 +395,6 @@ func (s *server) updateReplicators(evt event.Replicator) { } s.mu.Unlock() - if isDeleteRep { - s.peer.host.Peerstore().ClearAddrs(evt.Info.ID) - } else { - s.peer.host.Peerstore().AddAddrs(evt.Info.ID, evt.Info.Addrs, peerstore.PermanentAddrTTL) - } - if evt.Docs != nil { for update := range evt.Docs { if err := s.pushLog(s.peer.ctx, update, evt.Info.ID); err != nil { diff --git a/tests/clients/cli/wrapper.go b/tests/clients/cli/wrapper.go index d4976420b5..a6f3feef4c 100644 --- a/tests/clients/cli/wrapper.go +++ b/tests/clients/cli/wrapper.go @@ -537,6 +537,6 @@ func (w *Wrapper) PrintDump(ctx context.Context) error { return w.node.DB.PrintDump(ctx) } -func (w *Wrapper) Connect(ctx context.Context, addr peer.AddrInfo) error { - return w.node.Peer.Connect(ctx, addr) +func (w *Wrapper) Bootstrap(addrs []peer.AddrInfo) { + w.node.Peer.Bootstrap(addrs) } diff --git a/tests/clients/clients.go b/tests/clients/clients.go index 2a67ed0812..f5d822ab39 100644 --- a/tests/clients/clients.go +++ b/tests/clients/clients.go @@ -11,8 +11,6 @@ package clients import ( - "context" - "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcenetwork/defradb/client" @@ -22,5 +20,5 @@ import ( // required for testing. type Client interface { client.DB - Connect(context.Context, peer.AddrInfo) error + Bootstrap([]peer.AddrInfo) } diff --git a/tests/clients/http/wrapper.go b/tests/clients/http/wrapper.go index 7f45671b57..f183a4671a 100644 --- a/tests/clients/http/wrapper.go +++ b/tests/clients/http/wrapper.go @@ -233,6 +233,6 @@ func (w *Wrapper) PrintDump(ctx context.Context) error { return w.node.DB.PrintDump(ctx) } -func (w *Wrapper) Connect(ctx context.Context, addr peer.AddrInfo) error { - return w.node.Peer.Connect(ctx, addr) +func (w *Wrapper) Bootstrap(addrs []peer.AddrInfo) { + w.node.Peer.Bootstrap(addrs) } diff --git a/tests/integration/client.go b/tests/integration/client.go index aae14080a2..dee5a3f0c9 100644 --- a/tests/integration/client.go +++ b/tests/integration/client.go @@ -11,7 +11,6 @@ package tests import ( - "context" "fmt" "os" "strconv" @@ -101,11 +100,10 @@ func newGoClientWrapper(n *node.Node) *goClientWrapper { } } -func (w *goClientWrapper) Connect(ctx context.Context, addr peer.AddrInfo) error { +func (w *goClientWrapper) Bootstrap(addrs []peer.AddrInfo) { if w.peer != nil { - return w.peer.Connect(ctx, addr) + w.peer.Bootstrap(addrs) } - return nil } func (w *goClientWrapper) Close() { diff --git a/tests/integration/events.go b/tests/integration/events.go index 7f7b29a7b3..8b5754674c 100644 --- a/tests/integration/events.go +++ b/tests/integration/events.go @@ -11,7 +11,6 @@ package tests import ( - "context" "time" "github.com/sourcenetwork/immutable" @@ -33,15 +32,10 @@ func waitForNetworkSetupEvents(s *state, nodeID int) { reps, err := s.nodes[nodeID].GetAllReplicators(s.ctx) require.NoError(s.t, err) - p2pTopicEvents := 0 replicatorEvents := len(reps) + p2pTopicEvent := len(cols) > 0 - // there is only one message for loading of P2P collections - if len(cols) > 0 { - p2pTopicEvents = 1 - } - - for p2pTopicEvents > 0 && replicatorEvents > 0 { + for p2pTopicEvent && replicatorEvents > 0 { select { case _, ok := <-s.nodeEvents[nodeID].replicator.Message(): if !ok { @@ -53,7 +47,7 @@ func waitForNetworkSetupEvents(s *state, nodeID int) { if !ok { require.Fail(s.t, "subscription closed waiting for network setup events") } - p2pTopicEvents-- + p2pTopicEvent = false case <-time.After(eventTimeout): s.t.Fatalf("timeout waiting for network setup events") @@ -100,6 +94,8 @@ func waitForReplicatorDeleteEvent(s *state, cfg DeleteReplicator) { require.Fail(s.t, "timeout waiting for replicator event") } + delete(s.nodeP2P[cfg.TargetNodeID].connections, cfg.SourceNodeID) + delete(s.nodeP2P[cfg.SourceNodeID].connections, cfg.TargetNodeID) delete(s.nodeP2P[cfg.SourceNodeID].replicators, cfg.TargetNodeID) } @@ -170,6 +166,15 @@ func waitForUpdateEvents(s *state, nodeID immutable.Option[int], collectionID in require.Fail(s.t, "timeout waiting for update event") } + if i >= len(s.nodeConfigs) { + return // not testing network state + } + + // make sure the event is published on the network before proceeding + // this prevents nodes from missing messages that are sent before + // subscriptions are setup + time.Sleep(100 * time.Millisecond) + // update the actual document head on the node that updated it s.nodeP2P[i].actualDocHeads[evt.DocID] = evt.Cid @@ -198,10 +203,6 @@ func waitForUpdateEvents(s *state, nodeID immutable.Option[int], collectionID in // Will fail the test if an event is not received within the expected time interval to prevent tests // from running forever. func waitForMergeEvents(s *state, action WaitForSync) { - // use a longer timeout since the sync process can take a while - ctx, cancel := context.WithTimeout(s.ctx, 60*time.Second) - defer cancel() - for nodeID := 0; nodeID < len(s.nodes); nodeID++ { expect := s.nodeP2P[nodeID].expectedDocHeads @@ -228,7 +229,7 @@ func waitForMergeEvents(s *state, action WaitForSync) { } evt = msg.Data.(event.Merge) - case <-ctx.Done(): + case <-time.After(30 * eventTimeout): require.Fail(s.t, "timeout waiting for merge complete event") } diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go index 91ea92cc26..aaf7d72198 100644 --- a/tests/integration/p2p.go +++ b/tests/integration/p2p.go @@ -13,6 +13,7 @@ package tests import ( "time" + "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/net" @@ -149,21 +150,17 @@ func connectPeers( sourceNode := s.nodes[cfg.SourceNodeID] targetNode := s.nodes[cfg.TargetNodeID] - sourceAddr := sourceNode.PeerInfo() - targetAddr := targetNode.PeerInfo() - - log.InfoContext(s.ctx, "Connecting to peers", - corelog.Any("Source", sourceAddr), - corelog.Any("Target", targetAddr)) - - err := targetNode.Connect(s.ctx, sourceAddr) - require.NoError(s.t, err) - - err = sourceNode.Connect(s.ctx, targetAddr) - require.NoError(s.t, err) + addrs := []peer.AddrInfo{targetNode.PeerInfo()} + log.InfoContext(s.ctx, "Bootstrapping with peers", corelog.Any("Addresses", addrs)) + sourceNode.Bootstrap(addrs) s.nodeP2P[cfg.SourceNodeID].connections[cfg.TargetNodeID] = struct{}{} s.nodeP2P[cfg.TargetNodeID].connections[cfg.SourceNodeID] = struct{}{} + + // Bootstrap triggers a bunch of async stuff for which we have no good way of waiting on. It must be + // allowed to complete before documentation begins or it will not even try and sync it. So for now, we + // sleep a little. + time.Sleep(100 * time.Millisecond) } // configureReplicator configures a replicator relationship between two existing, started, nodes. @@ -231,6 +228,11 @@ func subscribeToCollection( expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError) assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) + + // The `n.Peer.AddP2PCollections(colIDs)` call above is calling some asynchronous functions + // for the pubsub subscription and those functions can take a bit of time to complete, + // we need to make sure this has finished before progressing. + time.Sleep(100 * time.Millisecond) } // unsubscribeToCollection removes the given collections from subscriptions on the given nodes. @@ -260,6 +262,11 @@ func unsubscribeToCollection( expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError) assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) + + // The `n.Peer.RemoveP2PCollections(colIDs)` call above is calling some asynchronous functions + // for the pubsub subscription and those functions can take a bit of time to complete, + // we need to make sure this has finished before progressing. + time.Sleep(100 * time.Millisecond) } // getAllP2PCollections gets all the active peer subscriptions and compares them against the diff --git a/tests/integration/state.go b/tests/integration/state.go index 888136fce3..9300d5f090 100644 --- a/tests/integration/state.go +++ b/tests/integration/state.go @@ -25,7 +25,7 @@ import ( "github.com/sourcenetwork/defradb/tests/clients" ) -// p2pState contains all p2p related testing test. +// p2pState contains all p2p related testing state. type p2pState struct { // connections contains all connected nodes. // From 666dbd2c2375f3c0962799280298413a0606de08 Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Wed, 17 Jul 2024 13:19:27 -0700 Subject: [PATCH 11/14] fix acp tests --- tests/integration/acp/register_and_update_test.go | 4 ++++ tests/integration/events.go | 2 +- tests/integration/net/simple/peer/with_delete_test.go | 10 ++-------- .../net/simple/replicator/with_create_test.go | 2 -- tests/integration/p2p.go | 5 +---- tests/integration/test_case.go | 8 +++----- tests/integration/utils2.go | 4 ++-- 7 files changed, 13 insertions(+), 22 deletions(-) diff --git a/tests/integration/acp/register_and_update_test.go b/tests/integration/acp/register_and_update_test.go index edfeb99c97..d7a72a8681 100644 --- a/tests/integration/acp/register_and_update_test.go +++ b/tests/integration/acp/register_and_update_test.go @@ -650,6 +650,8 @@ func TestACP_CreateWithIdentityAndUpdateWithoutIdentityGQL_CanNotUpdate(t *testi "name": "Shahzad Lone" } `, + + SkipUpdateEvent: true, }, testUtils.Request{ @@ -764,6 +766,8 @@ func TestACP_CreateWithIdentityAndUpdateWithWrongIdentityGQL_CanNotUpdate(t *tes "name": "Shahzad Lone" } `, + + SkipUpdateEvent: true, }, testUtils.Request{ diff --git a/tests/integration/events.go b/tests/integration/events.go index 8b5754674c..946c089d4a 100644 --- a/tests/integration/events.go +++ b/tests/integration/events.go @@ -202,7 +202,7 @@ func waitForUpdateEvents(s *state, nodeID immutable.Option[int], collectionID in // // Will fail the test if an event is not received within the expected time interval to prevent tests // from running forever. -func waitForMergeEvents(s *state, action WaitForSync) { +func waitForMergeEvents(s *state) { for nodeID := 0; nodeID < len(s.nodes); nodeID++ { expect := s.nodeP2P[nodeID].expectedDocHeads diff --git a/tests/integration/net/simple/peer/with_delete_test.go b/tests/integration/net/simple/peer/with_delete_test.go index b0b5fe3ded..01f03e2c75 100644 --- a/tests/integration/net/simple/peer/with_delete_test.go +++ b/tests/integration/net/simple/peer/with_delete_test.go @@ -175,7 +175,6 @@ func TestP2PWithMultipleDocumentsWithSingleUpdateBeforeConnectSingleDeleteWithSh Doc: `{ "Age": 60 }`, - DontSync: true, }, testUtils.ConnectPeers{ SourceNodeID: 0, @@ -247,7 +246,6 @@ func TestP2PWithMultipleDocumentsWithMultipleUpdatesBeforeConnectSingleDeleteWit Doc: `{ "Age": 60 }`, - DontSync: true, }, testUtils.UpdateDoc{ // Update John's Age on the first node only @@ -256,7 +254,6 @@ func TestP2PWithMultipleDocumentsWithMultipleUpdatesBeforeConnectSingleDeleteWit Doc: `{ "Age": 62 }`, - DontSync: true, }, testUtils.ConnectPeers{ SourceNodeID: 0, @@ -328,7 +325,6 @@ func TestP2PWithMultipleDocumentsWithUpdateAndDeleteBeforeConnectSingleDeleteWit Doc: `{ "Age": 60 }`, - DontSync: true, }, testUtils.UpdateDoc{ // Update John's Age on the first node only @@ -337,12 +333,10 @@ func TestP2PWithMultipleDocumentsWithUpdateAndDeleteBeforeConnectSingleDeleteWit Doc: `{ "Age": 62 }`, - DontSync: true, }, testUtils.DeleteDoc{ - NodeID: immutable.Some(0), - DocID: 0, - DontSync: true, + NodeID: immutable.Some(0), + DocID: 0, }, testUtils.ConnectPeers{ SourceNodeID: 0, diff --git a/tests/integration/net/simple/replicator/with_create_test.go b/tests/integration/net/simple/replicator/with_create_test.go index 5785337b98..08ef2baeec 100644 --- a/tests/integration/net/simple/replicator/with_create_test.go +++ b/tests/integration/net/simple/replicator/with_create_test.go @@ -12,7 +12,6 @@ package replicator import ( "testing" - "time" "github.com/sourcenetwork/immutable" @@ -182,7 +181,6 @@ func TestP2POneToOneReplicatorDoesNotSyncFromDeletedReplicator(t *testing.T) { }, testUtils.WaitForSync{ // No documents should be synced - ExpectedTimeout: 100 * time.Millisecond, }, testUtils.Request{ // Assert that John has not been synced to the second (target) node diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go index aaf7d72198..e15f7e9cfc 100644 --- a/tests/integration/p2p.go +++ b/tests/integration/p2p.go @@ -134,10 +134,7 @@ type GetAllP2PCollections struct { // // For example you will likely wish to `WaitForSync` after creating a document in node 0 before querying // node 1 to see if it has been replicated. -type WaitForSync struct { - // ExpectedTimeout is the duration to wait when expecting a timeout to occur. - ExpectedTimeout time.Duration -} +type WaitForSync struct{} // connectPeers connects two existing, started, nodes as peers. It returns a channel // that will receive an empty struct upon sync completion of all expected peer-sync events. diff --git a/tests/integration/test_case.go b/tests/integration/test_case.go index ad5a86a665..e2717c3875 100644 --- a/tests/integration/test_case.go +++ b/tests/integration/test_case.go @@ -311,9 +311,6 @@ type DeleteDoc struct { // String can be a partial, and the test will pass if an error is returned that // contains this string. ExpectedError string - - // Setting DontSync to true will prevent waiting for that delete. - DontSync bool } // UpdateDoc will attempt to update the given document using the set [MutationType]. @@ -349,8 +346,9 @@ type UpdateDoc struct { // contains this string. ExpectedError string - // Setting DontSync to true will prevent waiting for that update. - DontSync bool + // Skip waiting for an update event. This should only be used for + // tests that do not correctly publish an update event. + SkipUpdateEvent bool } // IndexField describes a field to be indexed. diff --git a/tests/integration/utils2.go b/tests/integration/utils2.go index 1395923655..6c2c9a1d69 100644 --- a/tests/integration/utils2.go +++ b/tests/integration/utils2.go @@ -341,7 +341,7 @@ func performAction( assertClientIntrospectionResults(s, action) case WaitForSync: - waitForMergeEvents(s, action) + waitForMergeEvents(s) case Benchmark: benchmarkAction(s, actionIndex, action) @@ -1425,7 +1425,7 @@ func updateDoc( assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) - if action.ExpectedError == "" { + if action.ExpectedError == "" && !action.SkipUpdateEvent { waitForUpdateEvents(s, action.NodeID, action.CollectionID) } } From 6e8fc67c185e6f542cea32a4d5a8d256e8f1816e Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Wed, 17 Jul 2024 13:56:47 -0700 Subject: [PATCH 12/14] notify exchange of new block before push log --- net/peer.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/net/peer.go b/net/peer.go index 768a255fc3..7222d7cf9f 100644 --- a/net/peer.go +++ b/net/peer.go @@ -24,6 +24,7 @@ import ( "github.com/ipfs/boxo/blockservice" exchange "github.com/ipfs/boxo/exchange" "github.com/ipfs/boxo/ipns" + blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" libp2p "github.com/libp2p/go-libp2p" @@ -446,6 +447,13 @@ func (p *Peer) handleDocUpdateLog(evt event.Update) error { } func (p *Peer) pushLogToReplicators(lg event.Update) { + // let the exchange know we have this block + // this should speed up the dag sync process + err := p.bserv.Exchange().NotifyNewBlocks(p.ctx, blocks.NewBlock(lg.Block)) + if err != nil { + log.ErrorContextE(p.ctx, "Failed to notify new blocks", err) + } + // push to each peer (replicator) peers := make(map[string]struct{}) for _, peer := range p.ps.ListPeers(lg.DocID) { From d036cb69bf0ba8d3faba15f552064d0bf303ca96 Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Wed, 17 Jul 2024 14:12:47 -0700 Subject: [PATCH 13/14] fix linter error --- tests/integration/p2p.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go index e15f7e9cfc..b1b79982cf 100644 --- a/tests/integration/p2p.go +++ b/tests/integration/p2p.go @@ -13,10 +13,10 @@ package tests import ( "time" - "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/net" + "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcenetwork/corelog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" From 5596026f348a1bdbe90692f3bad9fa14331c7e6f Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Thu, 18 Jul 2024 09:22:39 -0700 Subject: [PATCH 14/14] rename SkipUpdateEvent to SkipLocalUpdateEvent --- tests/integration/acp/register_and_update_test.go | 4 ++-- tests/integration/test_case.go | 8 +++++--- tests/integration/utils2.go | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/integration/acp/register_and_update_test.go b/tests/integration/acp/register_and_update_test.go index d7a72a8681..842ef1b754 100644 --- a/tests/integration/acp/register_and_update_test.go +++ b/tests/integration/acp/register_and_update_test.go @@ -651,7 +651,7 @@ func TestACP_CreateWithIdentityAndUpdateWithoutIdentityGQL_CanNotUpdate(t *testi } `, - SkipUpdateEvent: true, + SkipLocalUpdateEvent: true, }, testUtils.Request{ @@ -767,7 +767,7 @@ func TestACP_CreateWithIdentityAndUpdateWithWrongIdentityGQL_CanNotUpdate(t *tes } `, - SkipUpdateEvent: true, + SkipLocalUpdateEvent: true, }, testUtils.Request{ diff --git a/tests/integration/test_case.go b/tests/integration/test_case.go index 945a3e6d11..1ebf396253 100644 --- a/tests/integration/test_case.go +++ b/tests/integration/test_case.go @@ -353,9 +353,11 @@ type UpdateDoc struct { // contains this string. ExpectedError string - // Skip waiting for an update event. This should only be used for - // tests that do not correctly publish an update event. - SkipUpdateEvent bool + // Skip waiting for an update event on the local event bus. + // + // This should only be used for tests that do not correctly + // publish an update event to the local event bus. + SkipLocalUpdateEvent bool } // IndexField describes a field to be indexed. diff --git a/tests/integration/utils2.go b/tests/integration/utils2.go index c055fe03cb..67ed5dd3d1 100644 --- a/tests/integration/utils2.go +++ b/tests/integration/utils2.go @@ -1430,7 +1430,7 @@ func updateDoc( assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) - if action.ExpectedError == "" && !action.SkipUpdateEvent { + if action.ExpectedError == "" && !action.SkipLocalUpdateEvent { waitForUpdateEvents(s, action.NodeID, action.CollectionID) } }