Skip to content

Commit

Permalink
SOL-98567: Tweaked rebalance delay for partition queue to be 15secs
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Morgan <[email protected]>
  • Loading branch information
cjwmorgan-sol committed Nov 17, 2023
1 parent 63dcf9f commit 8db7443
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions test/partitioned_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ const PQLabel string = "partition_queue"
var _ = Describe("Partitioned Queue Tests", func() {
var queueName string = "partitioned_queue_test"
var topicName string = "partitioned_queue_topic_test"
var rebalanceDelay int64 = 10
var rebalanceDelay int64 = 15
var partitionCount int32 = 3
Context("queue has three partitions and rebalance delay of 10 seconds", func() {
Context("queue has three partitions and rebalance delay of 15 seconds", func() {
BeforeEach(func() {
helpers.CreatePartitionedQueue(queueName, partitionCount, rebalanceDelay, topicName)
})
Expand All @@ -56,6 +56,7 @@ var _ = Describe("Partitioned Queue Tests", func() {
It("should have at least one key assigned to each partition and same keyed messages go to same partition", Label(PQLabel), func() {
var messagingServices [4]solace.MessagingService
var partitionKeys [9]string
var rebalanceDelayDuration = time.Duration(rebalanceDelay) * 2

//generate partition keys
for i := 0; i < 9; i++ {
Expand Down Expand Up @@ -96,7 +97,7 @@ var _ = Describe("Partitioned Queue Tests", func() {
publisher.Publish(msg, resource.TopicOf(topicName), nil, nil)
}

publisher.Terminate(5 * time.Second)
publisher.Terminate(rebalanceDelayDuration * time.Second)

messageHandler := func(message message.InboundMessage) {
fmt.Println("message received")
Expand All @@ -113,21 +114,21 @@ var _ = Describe("Partitioned Queue Tests", func() {

Eventually(func() uint64 {
return receiverOneMetrics.GetValue(metrics.PersistentMessagesReceived)
}).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2))
}).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeNumerically(">=", 2))

Eventually(func() uint64 {
return receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived)
}).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2))
}).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeNumerically(">=", 2))

Eventually(func() uint64 {
return receiverThreeMetrics.GetValue(metrics.PersistentMessagesReceived)
}).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2))
}).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeNumerically(">=", 2))

Eventually(func() uint64 {
totalMessagesReceived := receiverOneMetrics.
GetValue(metrics.PersistentMessagesReceived) + receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) + receiverThreeMetrics.GetValue(metrics.PersistentMessagesReceived)
return totalMessagesReceived
}).WithTimeout(10 * time.Second).Should(Equal(publisherMetrics.GetValue(metrics.TotalMessagesSent)))
}).WithTimeout(rebalanceDelayDuration * time.Second).Should(Equal(publisherMetrics.GetValue(metrics.TotalMessagesSent)))

Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred())
Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred())
Expand All @@ -140,6 +141,8 @@ var _ = Describe("Partitioned Queue Tests", func() {
var listenerTwo solace.ReceiverStateChangeListener
var listenerThree solace.ReceiverStateChangeListener

var rebalanceDelayDuration = time.Duration(rebalanceDelay) * 2

var messagingServices [3]solace.MessagingService

partitionedQueue := resource.QueueDurableNonExclusive(queueName)
Expand Down Expand Up @@ -187,7 +190,7 @@ var _ = Describe("Partitioned Queue Tests", func() {
Expect(receiverTwo.Start()).ToNot(HaveOccurred())
Expect(receiverThree.Start()).ToNot(HaveOccurred())

time.Sleep(10 * time.Second)
time.Sleep(rebalanceDelayDuration * time.Second)

testcontext.SEMP().Config().QueueApi.UpdateMsgVpnQueue(
testcontext.SEMP().ConfigCtx(),
Expand All @@ -199,15 +202,15 @@ var _ = Describe("Partitioned Queue Tests", func() {
nil,
)

Eventually(ch).WithTimeout(10 * time.Second).Should(BeClosed())
Eventually(ch).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeClosed())

Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred())
Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred())
Expect(receiverThree.Terminate(10 * time.Second)).ToNot(HaveOccurred())
})
It("rebinds to same partition after reconnect within rebalance delay", Label(PQLabel), func() {
var connectionRetries uint = 5
var intervalDurationSec = time.Duration(1)
var intervalDurationSec = time.Duration(2)
var reconnectDurationTimoutSec = time.Duration(connectionRetries) * intervalDurationSec * 2 // should be about 10 secs
rebalanceDelayDuration := time.Duration(rebalanceDelay) * 2 // should be about 20 secs
var messagingServices [3]solace.MessagingService
Expand Down Expand Up @@ -291,7 +294,8 @@ var _ = Describe("Partitioned Queue Tests", func() {
return totalMessagesReceived
}).WithTimeout(rebalanceDelayDuration * time.Second).Should(Equal(publisherMetrics.GetValue(metrics.TotalMessagesSent)))

partitionKeysBeforeDisconnect := make([]string, len(receiverOnePartitionKeys))
numPartitionKeys := len(receiverOnePartitionKeys)
partitionKeysBeforeDisconnect := make([]string, numPartitionKeys)
copy(receiverOnePartitionKeys, partitionKeysBeforeDisconnect)
receiverOnePartitionKeys = receiverOnePartitionKeys[:0]

Expand All @@ -318,7 +322,7 @@ var _ = Describe("Partitioned Queue Tests", func() {
return totalMessagesReceived
}).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeNumerically(">=", publisherMetrics.GetValue(metrics.TotalMessagesSent)))

partitionKeysAfterReconnection := make([]string, len(receiverOnePartitionKeys))
partitionKeysAfterReconnection := make([]string, numPartitionKeys)
copy(receiverOnePartitionKeys, partitionKeysAfterReconnection)

Expect(partitionKeysBeforeDisconnect).Should(Equal(partitionKeysAfterReconnection))
Expand Down

0 comments on commit 8db7443

Please sign in to comment.