Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Network test sync logic #2748

Merged
merged 19 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions net/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,11 @@ func stopGRPCServer(ctx context.Context, server *grpc.Server) {
}
}

// Connect initiates a connection to the peer with the given address.
func (p *Peer) Connect(ctx context.Context, addr peer.AddrInfo) error {
return p.host.Connect(ctx, addr)
}

// Bootstrap connects to the given peers.
func (p *Peer) Bootstrap(addrs []peer.AddrInfo) {
var connected uint64
Expand Down
4 changes: 2 additions & 2 deletions tests/clients/cli/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,6 @@ func (w *Wrapper) PrintDump(ctx context.Context) error {
return w.node.DB.PrintDump(ctx)
}

func (w *Wrapper) Bootstrap(addrs []peer.AddrInfo) {
w.node.Peer.Bootstrap(addrs)
func (w *Wrapper) Connect(ctx context.Context, addr peer.AddrInfo) error {
return w.node.Peer.Connect(ctx, addr)
}
4 changes: 3 additions & 1 deletion tests/clients/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
package clients

import (
"context"

"github.com/libp2p/go-libp2p/core/peer"

"github.com/sourcenetwork/defradb/client"
Expand All @@ -20,5 +22,5 @@ import (
// required for testing.
type Client interface {
client.DB
Bootstrap([]peer.AddrInfo)
Connect(context.Context, peer.AddrInfo) error
}
4 changes: 2 additions & 2 deletions tests/clients/http/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,6 @@ func (w *Wrapper) PrintDump(ctx context.Context) error {
return w.node.DB.PrintDump(ctx)
}

func (w *Wrapper) Bootstrap(addrs []peer.AddrInfo) {
w.node.Peer.Bootstrap(addrs)
func (w *Wrapper) Connect(ctx context.Context, addr peer.AddrInfo) error {
return w.node.Peer.Connect(ctx, addr)
}
6 changes: 4 additions & 2 deletions tests/integration/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package tests

import (
"context"
"fmt"
"os"
"strconv"
Expand Down Expand Up @@ -100,10 +101,11 @@ func newGoClientWrapper(n *node.Node) *goClientWrapper {
}
}

func (w *goClientWrapper) Bootstrap(addrs []peer.AddrInfo) {
func (w *goClientWrapper) Connect(ctx context.Context, addr peer.AddrInfo) error {
if w.peer != nil {
w.peer.Bootstrap(addrs)
return w.peer.Connect(ctx, addr)
}
return nil
}

func (w *goClientWrapper) Close() {
Expand Down
241 changes: 241 additions & 0 deletions tests/integration/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"context"
"time"

"github.com/sourcenetwork/immutable"
"github.com/stretchr/testify/require"

"github.com/sourcenetwork/defradb/event"
)

// eventTimeout is the amount of time to wait
// for an event before timing out
const eventTimeout = 1 * time.Second

// waitForNetworkSetupEvents waits for p2p topic completed and
// replicator completed events to be published on the local node event bus.
func waitForNetworkSetupEvents(s *state, nodeID int) {
cols, err := s.nodes[nodeID].GetAllP2PCollections(s.ctx)
require.NoError(s.t, err)

reps, err := s.nodes[nodeID].GetAllReplicators(s.ctx)
require.NoError(s.t, err)

p2pTopicEvents := 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: It looks like p2pTopicEvents is only ever 0 or 1, why is it not a boolean? Is this a bug/wip?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to bool

replicatorEvents := len(reps)

// there is only one message for loading of P2P collections
if len(cols) > 0 {
p2pTopicEvents = 1
}

for p2pTopicEvents > 0 && replicatorEvents > 0 {
select {
case _, ok := <-s.nodeEvents[nodeID].replicator.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for network setup events")
}
replicatorEvents--

case _, ok := <-s.nodeEvents[nodeID].p2pTopic.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for network setup events")
}
p2pTopicEvents--

case <-time.After(eventTimeout):
s.t.Fatalf("timeout waiting for network setup events")
}
}
}

// waitForReplicatorConfigureEvent waits for a node to publish a
// replicator completed event on the local event bus.
//
// Expected document heads will be updated for the targeted node.
func waitForReplicatorConfigureEvent(s *state, cfg ConfigureReplicator) {
select {
case _, ok := <-s.nodeEvents[cfg.SourceNodeID].replicator.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for replicator event")
}

case <-time.After(eventTimeout):
require.Fail(s.t, "timeout waiting for replicator event")
}

// all previous documents should be merged on the subscriber node
for key, val := range s.nodeP2P[cfg.SourceNodeID].actualDocHeads {
s.nodeP2P[cfg.TargetNodeID].expectedDocHeads[key] = val
}

// update node connections and replicators
s.nodeP2P[cfg.TargetNodeID].connections[cfg.SourceNodeID] = struct{}{}
s.nodeP2P[cfg.SourceNodeID].connections[cfg.TargetNodeID] = struct{}{}
s.nodeP2P[cfg.SourceNodeID].replicators[cfg.TargetNodeID] = struct{}{}
}

// waitForReplicatorConfigureEvent waits for a node to publish a
// replicator completed event on the local event bus.
func waitForReplicatorDeleteEvent(s *state, cfg DeleteReplicator) {
select {
case _, ok := <-s.nodeEvents[cfg.SourceNodeID].replicator.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for replicator event")
}

case <-time.After(eventTimeout):
require.Fail(s.t, "timeout waiting for replicator event")
}

delete(s.nodeP2P[cfg.SourceNodeID].replicators, cfg.TargetNodeID)
}

// waitForSubscribeToCollectionEvent waits for a node to publish a
// p2p topic completed event on the local event bus.
//
// Expected document heads will be updated for the subscriber node.
func waitForSubscribeToCollectionEvent(s *state, action SubscribeToCollection) {
select {
case _, ok := <-s.nodeEvents[action.NodeID].p2pTopic.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for p2p topic event")
}

case <-time.After(eventTimeout):
require.Fail(s.t, "timeout waiting for p2p topic event")
}

// update peer collections of target node
for _, collectionIndex := range action.CollectionIDs {
if collectionIndex == NonExistentCollectionID {
continue // don't track non existent collections
}
s.nodeP2P[action.NodeID].peerCollections[collectionIndex] = struct{}{}
}
}

// waitForSubscribeToCollectionEvent waits for a node to publish a
// p2p topic completed event on the local event bus.
func waitForUnsubscribeToCollectionEvent(s *state, action UnsubscribeToCollection) {
select {
case _, ok := <-s.nodeEvents[action.NodeID].p2pTopic.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for p2p topic event")
}

case <-time.After(eventTimeout):
require.Fail(s.t, "timeout waiting for p2p topic event")
}

for _, collectionIndex := range action.CollectionIDs {
if collectionIndex == NonExistentCollectionID {
continue // don't track non existent collections
}
delete(s.nodeP2P[action.NodeID].peerCollections, collectionIndex)
}
}

// waitForUpdateEvents waits for all selected nodes to publish an
// update event to the local event bus.
//
// Expected document heads will be updated for any connected nodes.
func waitForUpdateEvents(s *state, nodeID immutable.Option[int], collectionID int) {
for i := 0; i < len(s.nodes); i++ {
if nodeID.HasValue() && nodeID.Value() != i {
continue // node is not selected
}

var evt event.Update
select {
case msg, ok := <-s.nodeEvents[i].update.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for update event")
}
evt = msg.Data.(event.Update)

case <-time.After(eventTimeout):
require.Fail(s.t, "timeout waiting for update event")
}

// update the actual document head on the node that updated it
s.nodeP2P[i].actualDocHeads[evt.DocID] = evt.Cid

// update the expected document heads of replicator targets
for id := range s.nodeP2P[i].replicators {
// replicator target nodes push updates to source nodes
s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid
}

// update the expected document heads of connected nodes
for id := range s.nodeP2P[i].connections {
// connected nodes share updates of documents they have in common
if _, ok := s.nodeP2P[id].actualDocHeads[evt.DocID]; ok {
s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid
}
// peer collection subscribers receive updates from any other subscriber node
if _, ok := s.nodeP2P[id].peerCollections[collectionID]; ok {
s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid
}
}
}
}

// waitForMergeEvents waits for all expected document heads to be merged to all nodes.
//
// Will fail the test if an event is not received within the expected time interval to prevent tests
// from running forever.
func waitForMergeEvents(s *state, action WaitForSync) {
// use a longer timeout since the sync process can take a while
ctx, cancel := context.WithTimeout(s.ctx, 60*time.Second)
defer cancel()

for nodeID := 0; nodeID < len(s.nodes); nodeID++ {
expect := s.nodeP2P[nodeID].expectedDocHeads

// remove any docs that are already merged
// up to the expected document head
for key, val := range s.nodeP2P[nodeID].actualDocHeads {
if head, ok := expect[key]; ok && head.String() == val.String() {
delete(expect, key)
}
}

// wait for all expected doc heads to be merged
//
// the order of merges does not matter as we only
// expect the latest head to eventually be merged
//
// unexpected merge events are ignored
for len(expect) > 0 {
var evt event.Merge
select {
case msg, ok := <-s.nodeEvents[nodeID].merge.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for merge complete event")
}
evt = msg.Data.(event.Merge)

case <-ctx.Done():
require.Fail(s.t, "timeout waiting for merge complete event")
}

head, ok := expect[evt.DocID]
if ok && head.String() == evt.Cid.String() {
delete(expect, evt.DocID)
}
}
}
}
Loading
Loading