Skip to content

Commit

Permalink
add wait on restart
Browse files Browse the repository at this point in the history
  • Loading branch information
fredcarle committed Jun 26, 2024
1 parent 2fcac46 commit 6a5071e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 2 deletions.
4 changes: 2 additions & 2 deletions net/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ func NewPeer(
cancel: cancel,
}

p.bus.Publish(event.NewMessage(event.PeerInfoName, event.PeerInfo{Info: p.PeerInfo()}))

p.server, err = newServer(p, options.GRPCDialOptions...)
if err != nil {
return nil, err
Expand All @@ -231,8 +233,6 @@ func NewPeer(

// Start all the internal workers/goroutines/loops that manage the P2P state.
func (p *Peer) Start() error {
p.bus.Publish(event.NewMessage(event.PeerInfoName, event.PeerInfo{Info: p.PeerInfo()}))

// reconnect to known peers
var wg sync.WaitGroup
for _, id := range p.host.Peerstore().PeersWithAddrs() {
Expand Down
24 changes: 24 additions & 0 deletions tests/integration/utils2.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,22 @@ func restartNodes(
continue
}

// We need to ensure that on restart, the node pubsub is configured before
// we continue with the test. Otherwise, we may miss update events.
readySub, err := node.DB.Events().Subscribe(event.P2PTopicCompletedName, event.ReplicatorCompletedName)
require.NoError(s.t, err)
waitLen := 0
cols, err := node.DB.GetAllP2PCollections(s.ctx)
require.NoError(s.t, err)
if len(cols) > 0 {
// there is only one message for loading of P2P collections
waitLen++
}
reps, err := node.DB.GetAllReplicators(s.ctx)
require.NoError(s.t, err)
// there is one message per replicator
waitLen += len(reps)

// We need to make sure the node is configured with its old address, otherwise
// a new one may be selected and reconnnection to it will fail.
var addresses []string
Expand Down Expand Up @@ -713,6 +729,14 @@ func restartNodes(
sub, err := c.Events().Subscribe(event.MergeCompleteName)
require.NoError(s.t, err)
s.eventSubs[i] = sub
for waitLen > 0 {
select {
case <-readySub.Message():
waitLen--
case <-time.After(10 * time.Second):
s.t.Fatalf("timeout waiting for node to be ready")
}
}
}

// The index of the action after the last wait action before the current restart action.
Expand Down

0 comments on commit 6a5071e

Please sign in to comment.