From e2af1e974818fadd588a127c60aa664660d6bf92 Mon Sep 17 00:00:00 2001
From: Fred Carle <fredcarle@users.noreply.github.com>
Date: Tue, 25 Jun 2024 14:32:25 -0400
Subject: [PATCH] add completed events for pubsub and replicator

---
 event/event.go           |  4 ++++
 net/server.go            |  2 ++
 tests/integration/p2p.go | 42 +++++++++++++++++++++++++++++++++++-----
 3 files changed, 43 insertions(+), 5 deletions(-)

diff --git a/event/event.go b/event/event.go
index 5889577c7d..fa557cc03c 100644
--- a/event/event.go
+++ b/event/event.go
@@ -38,6 +38,10 @@ const (
 	PeerInfoName = Name("peer-info")
 	// ReplicatorName is the name of the replicator event.
 	ReplicatorName = Name("replicator")
+	// P2PTopicCompletedName is the name of the network p2p topic update completed event.
+	P2PTopicCompletedName = Name("p2p-topic-completed")
+	// ReplicatorCompletedName is the name of the replicator completed event.
+	ReplicatorCompletedName = Name("replicator-completed")
 )
 
 // Peer is an event that is published when
diff --git a/net/server.go b/net/server.go
index ed5ac306c8..3b4922fe5e 100644
--- a/net/server.go
+++ b/net/server.go
@@ -325,6 +325,7 @@ func (s *server) updatePubSubTopics(evt event.P2PTopic) {
 			log.ErrorContextE(s.peer.ctx, "Failed to remove pubsub topic.", err)
 		}
 	}
+	s.peer.bus.Publish(event.NewMessage(event.P2PTopicCompletedName, nil))
 }
 
 func (s *server) updateReplicators(evt event.Replicator) {
@@ -368,4 +369,5 @@ func (s *server) updateReplicators(evt event.Replicator) {
 			}
 		}
 	}
+	s.peer.bus.Publish(event.NewMessage(event.ReplicatorCompletedName, nil))
 }
diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go
index d990a3d322..5796e143d6 100644
--- a/tests/integration/p2p.go
+++ b/tests/integration/p2p.go
@@ -288,7 +288,9 @@ func configureReplicator(
 	sourceNode := s.nodes[cfg.SourceNodeID]
 	targetNode := s.nodes[cfg.TargetNodeID]
 
-	err := sourceNode.SetReplicator(s.ctx, client.Replicator{
+	sub, err := sourceNode.Events().Subscribe(event.ReplicatorCompletedName)
+	require.NoError(s.t, err)
+	err = sourceNode.SetReplicator(s.ctx, client.Replicator{
 		Info: targetNode.PeerInfo(),
 	})
 
@@ -297,6 +299,11 @@ func configureReplicator(
 	if err == nil {
 		setupReplicatorWaitSync(s, 0, cfg)
 	}
+	for msg := range sub.Message() {
+		if msg.Name == event.ReplicatorCompletedName {
+			break
+		}
+	}
 }
 
 func deleteReplicator(
@@ -306,10 +313,17 @@ func deleteReplicator(
 	sourceNode := s.nodes[cfg.SourceNodeID]
 	targetNode := s.nodes[cfg.TargetNodeID]
 
-	err := sourceNode.DeleteReplicator(s.ctx, client.Replicator{
+	sub, err := sourceNode.Events().Subscribe(event.ReplicatorCompletedName)
+	require.NoError(s.t, err)
+	err = sourceNode.DeleteReplicator(s.ctx, client.Replicator{
 		Info: targetNode.PeerInfo(),
 	})
 	require.NoError(s.t, err)
+	for msg := range sub.Message() {
+		if msg.Name == event.ReplicatorCompletedName {
+			break
+		}
+	}
 }
 
 func setupReplicatorWaitSync(
@@ -390,14 +404,23 @@ func subscribeToCollection(
 		schemaRoots = append(schemaRoots, col.SchemaRoot())
 	}
 
-	err := n.AddP2PCollections(s.ctx, schemaRoots)
+	sub, err := n.Events().Subscribe(event.P2PTopicCompletedName)
+	require.NoError(s.t, err)
+
+	err = n.AddP2PCollections(s.ctx, schemaRoots)
 	expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError)
 	assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised)
 
+	for msg := range sub.Message() {
+		if msg.Name == event.P2PTopicCompletedName {
+			break
+		}
+	}
+
 	// The `n.Peer.AddP2PCollections(colIDs)` call above is calling some asynchronous functions
 	// for the pubsub subscription and those functions can take a bit of time to complete,
 	// we need to make sure this has finished before progressing.
-	time.Sleep(100 * time.Millisecond)
+	time.Sleep(1 * time.Millisecond)
 }
 
 // unsubscribeToCollection removes the given collections from subscriptions on the given nodes.
@@ -420,10 +443,19 @@ func unsubscribeToCollection(
 		schemaRoots = append(schemaRoots, col.SchemaRoot())
 	}
 
-	err := n.RemoveP2PCollections(s.ctx, schemaRoots)
+	sub, err := n.Events().Subscribe(event.P2PTopicCompletedName)
+	require.NoError(s.t, err)
+
+	err = n.RemoveP2PCollections(s.ctx, schemaRoots)
 	expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError)
 	assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised)
 
+	for msg := range sub.Message() {
+		if msg.Name == event.P2PTopicCompletedName {
+			break
+		}
+	}
+
 	// The `n.Peer.RemoveP2PCollections(colIDs)` call above is calling some asynchronous functions
 	// for the pubsub subscription and those functions can take a bit of time to complete,
 	// we need to make sure this has finished before progressing.