diff --git a/net/peer.go b/net/peer.go index 231abfb02a..bafd02fb92 100644 --- a/net/peer.go +++ b/net/peer.go @@ -219,6 +219,8 @@ func NewPeer( cancel: cancel, } + p.bus.Publish(event.NewMessage(event.PeerInfoName, event.PeerInfo{Info: p.PeerInfo()})) + p.server, err = newServer(p, options.GRPCDialOptions...) if err != nil { return nil, err @@ -231,8 +233,6 @@ func NewPeer( // Start all the internal workers/goroutines/loops that manage the P2P state. func (p *Peer) Start() error { - p.bus.Publish(event.NewMessage(event.PeerInfoName, event.PeerInfo{Info: p.PeerInfo()})) - // reconnect to known peers var wg sync.WaitGroup for _, id := range p.host.Peerstore().PeersWithAddrs() { diff --git a/tests/integration/utils2.go b/tests/integration/utils2.go index eca87772a1..98f02d8f62 100644 --- a/tests/integration/utils2.go +++ b/tests/integration/utils2.go @@ -686,6 +686,22 @@ func restartNodes( continue } + // We need to ensure that on restart, the node pubsub is configured before + // we continue with the test. Otherwise, we may miss update events. + readySub, err := node.DB.Events().Subscribe(event.P2PTopicCompletedName, event.ReplicatorCompletedName) + require.NoError(s.t, err) + waitLen := 0 + cols, err := node.DB.GetAllP2PCollections(s.ctx) + require.NoError(s.t, err) + if len(cols) > 0 { + // there is only one message for loading of P2P collections + waitLen++ + } + reps, err := node.DB.GetAllReplicators(s.ctx) + require.NoError(s.t, err) + // there is one message per replicator + waitLen += len(reps) + // We need to make sure the node is configured with its old address, otherwise // a new one may be selected and reconnnection to it will fail. var addresses []string @@ -713,6 +729,14 @@ func restartNodes( sub, err := c.Events().Subscribe(event.MergeCompleteName) require.NoError(s.t, err) s.eventSubs[i] = sub + for waitLen > 0 { + select { + case <-readySub.Message(): + waitLen-- + case <-time.After(10 * time.Second): + s.t.Fatalf("timeout waiting for node to be ready") + } + } } // The index of the action after the last wait action before the current restart action.