From 63dcf9fb08195c70dc121b449cb63fc0e7ceb932 Mon Sep 17 00:00:00 2001 From: Chris Morgan Date: Wed, 15 Nov 2023 14:08:34 -0500 Subject: [PATCH 1/2] SOL-98567: Adjust rebalance delay for partitioned queue tests Fixed issue were PQ test does nto wait for received messages Signed-off-by: Chris Morgan --- test/helpers/resource_helpers.go | 22 +- test/partitioned_queue_test.go | 605 ++++++++++++++++--------------- 2 files changed, 317 insertions(+), 310 deletions(-) diff --git a/test/helpers/resource_helpers.go b/test/helpers/resource_helpers.go index eaf2d95..8c8d3a9 100644 --- a/test/helpers/resource_helpers.go +++ b/test/helpers/resource_helpers.go @@ -64,17 +64,17 @@ func CreateNonExclusiveQueue(queueName string, topics ...string) { // CreatePartitionedQueue function func CreatePartitionedQueue(queueName string, partitionCount int32, partitionRebalanceDelay int64, topics ...string) { - _, _, err := testcontext.SEMP().Config().QueueApi.CreateMsgVpnQueue(testcontext.SEMP() .ConfigCtx(), sempconfig.MsgVpnQueue{ - QueueName: queueName, - AccessType: "non-exclusive", - Permission: "modify-topic", - IngressEnabled: True, - EgressEnabled: True, - PartitionCount: partitionCount, - PartitionRebalanceDelay: partitionRebalanceDelay, - Owner: "default", - }, testcontext.Messaging().VPN, nil) - ExpectWithOffset(1, err).ToNot(HaveOccurred(), "Failed to create queue with name "+queueName) + _, _, err := testcontext.SEMP().Config().QueueApi.CreateMsgVpnQueue(testcontext.SEMP().ConfigCtx(), sempconfig.MsgVpnQueue{ + QueueName: queueName, + AccessType: "non-exclusive", + Permission: "modify-topic", + IngressEnabled: True, + EgressEnabled: True, + PartitionCount: partitionCount, + PartitionRebalanceDelay: partitionRebalanceDelay, + Owner: "default", + }, testcontext.Messaging().VPN, nil) + ExpectWithOffset(1, err).ToNot(HaveOccurred(), "Failed to create queue with name "+queueName) for _, topic := range topics { _, _, err = testcontext.SEMP().Config().QueueApi.CreateMsgVpnQueueSubscription(testcontext.SEMP().ConfigCtx(), sempconfig.MsgVpnQueueSubscription{ diff --git a/test/partitioned_queue_test.go b/test/partitioned_queue_test.go index 9009560..2b64c21 100644 --- a/test/partitioned_queue_test.go +++ b/test/partitioned_queue_test.go @@ -17,306 +17,313 @@ package test import ( - "time" - "fmt" - "strconv" - - "solace.dev/go/messaging" - "solace.dev/go/messaging/pkg/solace" - "solace.dev/go/messaging/pkg/solace/config" - "solace.dev/go/messaging/pkg/solace/metrics" - "solace.dev/go/messaging/pkg/solace/resource" - //"solace.dev/go/messaging/pkg/solace/subcode" - "solace.dev/go/messaging/test/helpers" - "solace.dev/go/messaging/test/testcontext" - "solace.dev/go/messaging/pkg/solace/message" - - sempconfig "solace.dev/go/messaging/test/sempclient/config" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" + "fmt" + "strconv" + "time" + + "solace.dev/go/messaging" + "solace.dev/go/messaging/pkg/solace" + "solace.dev/go/messaging/pkg/solace/config" + "solace.dev/go/messaging/pkg/solace/metrics" + "solace.dev/go/messaging/pkg/solace/resource" + //"solace.dev/go/messaging/pkg/solace/subcode" + "solace.dev/go/messaging/pkg/solace/message" + "solace.dev/go/messaging/test/helpers" + "solace.dev/go/messaging/test/testcontext" + + sempconfig "solace.dev/go/messaging/test/sempclient/config" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" ) +const PQLabel string = "partition_queue" + var _ = Describe("Partitioned Queue Tests", func() { - var queueName string = "partitioned_queue_test" - var topicName string = "partitioned_queue_topic_test" - var rebalanceDelay int64 = 5 - var partitionCount int32 = 3 - Context("queue has three partitions and rebalance delay of 1 second", func() { - BeforeEach(func() { - helpers.CreatePartitionedQueue(queueName, partitionCount, rebalanceDelay, topicName) - }) - - AfterEach(func() { - helpers.DeleteQueue(queueName) - }) - - It("should have at least one key assigned to each partition and same keyed messages go to same partition", func() { - var messagingServices[4]solace.MessagingService - var partitionKeys[9]string - - //generate partition keys - for i := 0; i < 9; i++{ - partitionKeys[i] = "key_"+ strconv.Itoa(i) - } - - for i := 0; i < 4; i++{ - messagingServices[i] = helpers.BuildMessagingService(messaging.NewMessagingServiceBuilder().FromConfigurationProvider(helpers.DefaultConfiguration())) - helpers.ConnectMessagingService(messagingServices[i]) - } - - defer func() { - for i := 0; i < 4; i++{ - helpers.DisconnectMessagingService(messagingServices[i]) - } - }() - - - partitionedQueue := resource.QueueDurableNonExclusive(queueName) - publisher := helpers.NewPersistentPublisher(messagingServices[0]) - - - receiverOne, _ := messagingServices[1].CreatePersistentMessageReceiverBuilder(). - WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) - - receiverTwo, _ := messagingServices[2].CreatePersistentMessageReceiverBuilder(). - WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) - - receiverThree, _ := messagingServices[3].CreatePersistentMessageReceiverBuilder(). - WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) - - publisher.Start() - receiverOne.Start() - receiverTwo.Start() - receiverThree.Start() - - messageBuilder := messagingServices[0].MessageBuilder() - for i := 0; i < 18; i++{ - msg, _ := messageBuilder.WithProperty(config.MessageProperty(config.QueuePartitionKey), partitionKeys[i % 9]).BuildWithStringPayload("Hi Solace") - publisher.Publish(msg, resource.TopicOf(topicName), nil, nil) - } - - publisher.Terminate(5 * time.Second) - - messageHandler := func(message message.InboundMessage) { - fmt.Println("message received") - } - - receiverOne.ReceiveAsync(messageHandler) - receiverTwo.ReceiveAsync(messageHandler) - receiverThree.ReceiveAsync(messageHandler) - - publisherMetrics := messagingServices[0].Metrics() - receiverOneMetrics := messagingServices[1].Metrics() - receiverTwoMetrics := messagingServices[2].Metrics() - receiverThreeMetrics := messagingServices[3].Metrics() - - Eventually(func() uint64 { - return receiverOneMetrics.GetValue(metrics.PersistentMessagesReceived) - }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) - - Eventually(func() uint64 { - return receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) - }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) - - Eventually(func() uint64 { - return receiverThreeMetrics.GetValue(metrics.PersistentMessagesReceived) - }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) - - Eventually( func() uint64 { - totalMessagesReceived := receiverOneMetrics. - GetValue(metrics.PersistentMessagesReceived) + receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) + receiverThreeMetrics.GetValue(metrics.PersistentMessagesReceived) - return totalMessagesReceived - }).WithTimeout(10 * time.Second).Should(Equal(publisherMetrics.GetValue(metrics.TotalMessagesSent))) - - Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred()) - Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred()) - Expect(receiverThree.Terminate(10 * time.Second)).ToNot(HaveOccurred()) - }) - - It("generates flow inactive event when no partitions left for consumer to bind", func() { - - var listenerOne solace.ReceiverStateChangeListener - var listenerTwo solace.ReceiverStateChangeListener - var listenerThree solace.ReceiverStateChangeListener - - var messagingServices[3]solace.MessagingService - - partitionedQueue := resource.QueueDurableNonExclusive(queueName) - - for i := 0; i < 3; i++{ - messagingServices[i] = helpers.BuildMessagingService(messaging.NewMessagingServiceBuilder(). - FromConfigurationProvider(helpers.DefaultConfiguration())) - helpers.ConnectMessagingService(messagingServices[i]) - } - - defer func() { - for i := 0; i < 3; i++{ - helpers.DisconnectMessagingService(messagingServices[i]) - } - }() - //activeStateTransitions refer to start-up induced state changes - //(i.e., receiver transition from passive to active whereas passive transitions are induced on partition downscale - activeStateTransitions, passiveStateTransitions := 0, 0 - ch := make(chan struct{}) - - passiveTransitionIncrementor := func(oldState, newState solace.ReceiverState, timestamp time.Time) { - if oldState == solace.ReceiverActive && newState == solace.ReceiverPassive { - passiveStateTransitions++ - } else { - activeStateTransitions++ - } - - if passiveStateTransitions == 2 && activeStateTransitions == 3 { - close(ch) - } - } - - listenerOne, listenerTwo, listenerThree = passiveTransitionIncrementor, passiveTransitionIncrementor, passiveTransitionIncrementor - - receiverOne, _ := messagingServices[0].CreatePersistentMessageReceiverBuilder(). - WithSubscriptions(resource.TopicSubscriptionOf(topicName)).WithActivationPassivationSupport(listenerOne).Build(partitionedQueue) - - receiverTwo, _ := messagingServices[1].CreatePersistentMessageReceiverBuilder(). - WithSubscriptions(resource.TopicSubscriptionOf(topicName)).WithActivationPassivationSupport(listenerTwo).Build(partitionedQueue) - - receiverThree, _ := messagingServices[2].CreatePersistentMessageReceiverBuilder(). - WithSubscriptions(resource.TopicSubscriptionOf(topicName)).WithActivationPassivationSupport(listenerThree).Build(partitionedQueue) - - Expect(receiverOne.Start()).ToNot(HaveOccurred()) - Expect(receiverTwo.Start()).ToNot(HaveOccurred()) - Expect(receiverThree.Start()).ToNot(HaveOccurred()) - - time.Sleep(10 * time.Second) - - testcontext.SEMP().Config().QueueApi.UpdateMsgVpnQueue( - testcontext.SEMP().ConfigCtx(), - sempconfig.MsgVpnQueue{ - PartitionCount: 1, - }, - testcontext.Messaging().VPN, - queueName, - nil, - ) - - Eventually(ch).WithTimeout(10 * time.Second).Should(BeClosed()) - - Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred()) - Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred()) - Expect(receiverThree.Terminate(10 * time.Second)).ToNot(HaveOccurred()) - }) - It("rebinds to same partition after reconnect within rebalance delay", func () { - var messagingServices[3]solace.MessagingService - for i := 0; i < 3; i++{ - if i == 2 { - messagingServices[i] = helpers.BuildMessagingService(messaging.NewMessagingServiceBuilder(). - FromConfigurationProvider(helpers.ToxicConfiguration())) - } else { - messagingServices[i] = helpers.BuildMessagingService(messaging.NewMessagingServiceBuilder(). - FromConfigurationProvider(helpers.DefaultConfiguration())) - } - - helpers.ConnectMessagingService(messagingServices[i]) - } - - defer func() { - for i := 0; i < 3; i++{ - helpers.DisconnectMessagingService(messagingServices[i]) - } - }() - - var partitionKeys[9]string - for i := 0; i < 9; i++{ - partitionKeys[i] = "key_"+ strconv.Itoa(i) - } - - publisher := helpers.NewPersistentPublisher(messagingServices[0]) - messageBuilder := messagingServices[0].MessageBuilder() - publisher.Start() - - publisherMetrics := messagingServices[0].Metrics() - publishMessages := func (firstConnectionAttempt bool) { - for i := 0; i < 18; i++{ - msg, _ := messageBuilder.WithProperty(config.MessageProperty(config.QueuePartitionKey), partitionKeys[i % 9]).BuildWithStringPayload("Hi Solace") - publisher.Publish(msg, resource.TopicOf(topicName), nil, nil) - } - - if firstConnectionAttempt { - Eventually(func() uint64 { - return publisherMetrics.GetValue(metrics.TotalMessagesSent) - }).WithTimeout(30 * time.Second).Should(BeNumerically("==", 18)) - } else { - Eventually(func() uint64 { - return publisherMetrics.GetValue(metrics.TotalMessagesSent) - }).WithTimeout(30 * time.Second).Should(BeNumerically("==", 36)) - } - } - - publishMessages(true) - - partitionedQueue := resource.QueueDurableNonExclusive(queueName) - receiverOne, _ := messagingServices[1].CreatePersistentMessageReceiverBuilder(). - WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) - receiverOne.Start() - receiverOnePartitionKeys := make([]string, 0, 18) - receiverOneMessageHandler := func (message message.InboundMessage) { - partitionKey, _ := message.GetProperty("JMSXGroupID") - partitionKeyValue := fmt.Sprint(partitionKey) - receiverOnePartitionKeys = append(receiverOnePartitionKeys, partitionKeyValue) - } - - receiverTwo, _ := messagingServices[2].CreatePersistentMessageReceiverBuilder(). - WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) - receiverTwo.Start() - receiverTwoMessageHandler := func (message message.InboundMessage){ - fmt.Println("Received message in receiverTwo") - } - - receiverOne.ReceiveAsync(receiverOneMessageHandler) - receiverTwo.ReceiveAsync(receiverTwoMessageHandler) - - receiverOneMetrics := messagingServices[1].Metrics() - receiverTwoMetrics := messagingServices[2].Metrics() - - Eventually( func() uint64 { - totalMessagesReceived := receiverOneMetrics.GetValue(metrics.PersistentMessagesReceived) + receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) - return totalMessagesReceived - }).WithTimeout(30 * time.Second).Should(Equal(publisherMetrics.GetValue(metrics.TotalMessagesSent))) - - partitionKeysBeforeDisconnect := make([]string, len(receiverOnePartitionKeys)) - copy(receiverOnePartitionKeys, partitionKeysBeforeDisconnect) - receiverOnePartitionKeys = receiverOnePartitionKeys[:0] - - reconnectionListenerChan := make(chan struct{}) - messagingServices[2].AddReconnectionListener(func(even solace.ServiceEvent) { - close(reconnectionListenerChan) - }) - - reconnectAttemptListenerChan := make(chan struct{}) - messagingServices[2].AddReconnectionAttemptListener(func(event solace.ServiceEvent) { - testcontext.Toxi().SMF().Enable() - close(reconnectAttemptListenerChan) - }) - - //temporarily disconnect receiverTwo - testcontext.Toxi().SMF().Disable() - - Eventually(reconnectionListenerChan).WithTimeout(30 * time.Second).Should(BeClosed()) - - //republish messages - publishMessages(false) - - Eventually( func() uint64 { - totalMessagesReceived := receiverOneMetrics.GetValue(metrics.PersistentMessagesReceived) + receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) - return totalMessagesReceived - }).WithTimeout(30 * time.Second).Should(BeNumerically(">=", publisherMetrics.GetValue(metrics.TotalMessagesSent))) - - partitionKeysAfterReconnection := make([]string, len(receiverOnePartitionKeys)) - copy(receiverOnePartitionKeys, partitionKeysAfterReconnection) - - Expect(partitionKeysBeforeDisconnect).Should(Equal(partitionKeysAfterReconnection)) - Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred()) - Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred()) - }) - }) + var queueName string = "partitioned_queue_test" + var topicName string = "partitioned_queue_topic_test" + var rebalanceDelay int64 = 10 + var partitionCount int32 = 3 + Context("queue has three partitions and rebalance delay of 10 seconds", func() { + BeforeEach(func() { + helpers.CreatePartitionedQueue(queueName, partitionCount, rebalanceDelay, topicName) + }) + + AfterEach(func() { + helpers.DeleteQueue(queueName) + }) + + It("should have at least one key assigned to each partition and same keyed messages go to same partition", Label(PQLabel), func() { + var messagingServices [4]solace.MessagingService + var partitionKeys [9]string + + //generate partition keys + for i := 0; i < 9; i++ { + partitionKeys[i] = "key_" + strconv.Itoa(i) + } + + for i := 0; i < 4; i++ { + messagingServices[i] = helpers.BuildMessagingService(messaging.NewMessagingServiceBuilder().FromConfigurationProvider(helpers.DefaultConfiguration())) + helpers.ConnectMessagingService(messagingServices[i]) + } + + defer func() { + for i := 0; i < 4; i++ { + helpers.DisconnectMessagingService(messagingServices[i]) + } + }() + + partitionedQueue := resource.QueueDurableNonExclusive(queueName) + publisher := helpers.NewPersistentPublisher(messagingServices[0]) + + receiverOne, _ := messagingServices[1].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) + + receiverTwo, _ := messagingServices[2].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) + + receiverThree, _ := messagingServices[3].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) + + publisher.Start() + receiverOne.Start() + receiverTwo.Start() + receiverThree.Start() + + messageBuilder := messagingServices[0].MessageBuilder() + for i := 0; i < 18; i++ { + msg, _ := messageBuilder.WithProperty(config.MessageProperty(config.QueuePartitionKey), partitionKeys[i%9]).BuildWithStringPayload("Hi Solace") + publisher.Publish(msg, resource.TopicOf(topicName), nil, nil) + } + + publisher.Terminate(5 * time.Second) + + messageHandler := func(message message.InboundMessage) { + fmt.Println("message received") + } + + receiverOne.ReceiveAsync(messageHandler) + receiverTwo.ReceiveAsync(messageHandler) + receiverThree.ReceiveAsync(messageHandler) + + publisherMetrics := messagingServices[0].Metrics() + receiverOneMetrics := messagingServices[1].Metrics() + receiverTwoMetrics := messagingServices[2].Metrics() + receiverThreeMetrics := messagingServices[3].Metrics() + + Eventually(func() uint64 { + return receiverOneMetrics.GetValue(metrics.PersistentMessagesReceived) + }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) + + Eventually(func() uint64 { + return receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) + }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) + + Eventually(func() uint64 { + return receiverThreeMetrics.GetValue(metrics.PersistentMessagesReceived) + }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) + + Eventually(func() uint64 { + totalMessagesReceived := receiverOneMetrics. + GetValue(metrics.PersistentMessagesReceived) + receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) + receiverThreeMetrics.GetValue(metrics.PersistentMessagesReceived) + return totalMessagesReceived + }).WithTimeout(10 * time.Second).Should(Equal(publisherMetrics.GetValue(metrics.TotalMessagesSent))) + + Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + Expect(receiverThree.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + }) + + It("generates flow inactive event when no partitions left for consumer to bind", Label(PQLabel), func() { + + var listenerOne solace.ReceiverStateChangeListener + var listenerTwo solace.ReceiverStateChangeListener + var listenerThree solace.ReceiverStateChangeListener + + var messagingServices [3]solace.MessagingService + + partitionedQueue := resource.QueueDurableNonExclusive(queueName) + + for i := 0; i < 3; i++ { + messagingServices[i] = helpers.BuildMessagingService(messaging.NewMessagingServiceBuilder(). + FromConfigurationProvider(helpers.DefaultConfiguration())) + helpers.ConnectMessagingService(messagingServices[i]) + } + + defer func() { + for i := 0; i < 3; i++ { + helpers.DisconnectMessagingService(messagingServices[i]) + } + }() + //activeStateTransitions refer to start-up induced state changes + //(i.e., receiver transition from passive to active whereas passive transitions are induced on partition downscale + activeStateTransitions, passiveStateTransitions := 0, 0 + ch := make(chan struct{}) + + passiveTransitionIncrementor := func(oldState, newState solace.ReceiverState, timestamp time.Time) { + if oldState == solace.ReceiverActive && newState == solace.ReceiverPassive { + passiveStateTransitions++ + } else { + activeStateTransitions++ + } + + if passiveStateTransitions == 2 && activeStateTransitions == 3 { + close(ch) + } + } + + listenerOne, listenerTwo, listenerThree = passiveTransitionIncrementor, passiveTransitionIncrementor, passiveTransitionIncrementor + + receiverOne, _ := messagingServices[0].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).WithActivationPassivationSupport(listenerOne).Build(partitionedQueue) + + receiverTwo, _ := messagingServices[1].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).WithActivationPassivationSupport(listenerTwo).Build(partitionedQueue) + + receiverThree, _ := messagingServices[2].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).WithActivationPassivationSupport(listenerThree).Build(partitionedQueue) + + Expect(receiverOne.Start()).ToNot(HaveOccurred()) + Expect(receiverTwo.Start()).ToNot(HaveOccurred()) + Expect(receiverThree.Start()).ToNot(HaveOccurred()) + + time.Sleep(10 * time.Second) + + testcontext.SEMP().Config().QueueApi.UpdateMsgVpnQueue( + testcontext.SEMP().ConfigCtx(), + sempconfig.MsgVpnQueue{ + PartitionCount: 1, + }, + testcontext.Messaging().VPN, + queueName, + nil, + ) + + Eventually(ch).WithTimeout(10 * time.Second).Should(BeClosed()) + + Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + Expect(receiverThree.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + }) + It("rebinds to same partition after reconnect within rebalance delay", Label(PQLabel), func() { + var connectionRetries uint = 5 + var intervalDurationSec = time.Duration(1) + var reconnectDurationTimoutSec = time.Duration(connectionRetries) * intervalDurationSec * 2 // should be about 10 secs + rebalanceDelayDuration := time.Duration(rebalanceDelay) * 2 // should be about 20 secs + var messagingServices [3]solace.MessagingService + for i := 0; i < 3; i++ { + messagingServices[i] = helpers.BuildMessagingService(messaging.NewMessagingServiceBuilder(). + FromConfigurationProvider(helpers.DefaultConfiguration()). + WithReconnectionRetryStrategy(config. + RetryStrategyParameterizedRetry(connectionRetries, intervalDurationSec*time.Second))) + + helpers.ConnectMessagingService(messagingServices[i]) + } + + defer func() { + for i := 0; i < 3; i++ { + helpers.DisconnectMessagingService(messagingServices[i]) + } + }() + + var partitionKeys [9]string + for i := 0; i < 9; i++ { + partitionKeys[i] = "key_" + strconv.Itoa(i) + } + + publisher := helpers.NewPersistentPublisher(messagingServices[0]) + messageBuilder := messagingServices[0].MessageBuilder() + publisher.Start() + + publisherMetrics := messagingServices[0].Metrics() + publishMessages := func(firstConnectionAttempt bool) { + for i := 0; i < 18; i++ { + msg, _ := messageBuilder.WithProperty(config.MessageProperty(config.QueuePartitionKey), partitionKeys[i%9]).BuildWithStringPayload("Hi Solace") + publisher.Publish(msg, resource.TopicOf(topicName), nil, nil) + } + + if firstConnectionAttempt { + Eventually(func() uint64 { + return publisherMetrics.GetValue(metrics.TotalMessagesSent) + }).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeNumerically("==", 18)) + } else { + Eventually(func() uint64 { + return publisherMetrics.GetValue(metrics.TotalMessagesSent) + }).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeNumerically("==", 36)) + } + } + + publishMessages(true) + + partitionedQueue := resource.QueueDurableNonExclusive(queueName) + receiverOne, _ := messagingServices[1].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) + var receiverOneMessageCount uint64 = 0 + var receiverTwoMessageCount uint64 = 0 + + receiverOnePartitionKeys := make([]string, 0, 18) + receiverOneMessageHandler := func(message message.InboundMessage) { + // get user property "JMSXGroupID" for partition key + if partitionKey, present := message.GetProperty(config.QueuePartitionKey); present { + partitionKeyValue := partitionKey.(string) + receiverOnePartitionKeys = append(receiverOnePartitionKeys, partitionKeyValue) + } + // must count dispatched message after PKey is recorded to avoid racing with main go routine + receiverOneMessageCount += 1 + } + + receiverTwo, _ := messagingServices[2].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) + receiverTwoMessageHandler := func(message message.InboundMessage) { + //fmt.Println("Received message in receiverTwo") + // count the received messages dispatched for the receiver + receiverTwoMessageCount += 1 + } + + receiverOne.ReceiveAsync(receiverOneMessageHandler) + receiverTwo.ReceiveAsync(receiverTwoMessageHandler) + + receiverOne.Start() + receiverTwo.Start() + + Eventually(func() uint64 { + totalMessagesReceived := receiverOneMessageCount + receiverTwoMessageCount + return totalMessagesReceived + }).WithTimeout(rebalanceDelayDuration * time.Second).Should(Equal(publisherMetrics.GetValue(metrics.TotalMessagesSent))) + + partitionKeysBeforeDisconnect := make([]string, len(receiverOnePartitionKeys)) + copy(receiverOnePartitionKeys, partitionKeysBeforeDisconnect) + receiverOnePartitionKeys = receiverOnePartitionKeys[:0] + + reconnectionListenerChan := make(chan struct{}) + messagingServices[2].AddReconnectionListener(func(even solace.ServiceEvent) { + close(reconnectionListenerChan) + }) + + reconnectAttemptListenerChan := make(chan struct{}) + messagingServices[2].AddReconnectionAttemptListener(func(event solace.ServiceEvent) { + close(reconnectAttemptListenerChan) + }) + + //temporarily disconnect receiverTwo + helpers.ForceDisconnectViaSEMPv2(messagingServices[2]) + + Eventually(reconnectionListenerChan).WithTimeout(reconnectDurationTimoutSec * time.Second).Should(BeClosed()) + + //republish messages + publishMessages(false) + + Eventually(func() uint64 { + totalMessagesReceived := receiverOneMessageCount + receiverTwoMessageCount + return totalMessagesReceived + }).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeNumerically(">=", publisherMetrics.GetValue(metrics.TotalMessagesSent))) + + partitionKeysAfterReconnection := make([]string, len(receiverOnePartitionKeys)) + copy(receiverOnePartitionKeys, partitionKeysAfterReconnection) + + Expect(partitionKeysBeforeDisconnect).Should(Equal(partitionKeysAfterReconnection)) + Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + }) + }) }) From 8db7443eb802e4de7ea9228b254b7b2f0f9f1d13 Mon Sep 17 00:00:00 2001 From: Chris Morgan Date: Fri, 17 Nov 2023 10:13:27 -0500 Subject: [PATCH 2/2] SOL-98567: Tweaked rebalance delay for partition queue to be 15secs Signed-off-by: Chris Morgan --- test/partitioned_queue_test.go | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/test/partitioned_queue_test.go b/test/partitioned_queue_test.go index 2b64c21..1f57630 100644 --- a/test/partitioned_queue_test.go +++ b/test/partitioned_queue_test.go @@ -42,9 +42,9 @@ const PQLabel string = "partition_queue" var _ = Describe("Partitioned Queue Tests", func() { var queueName string = "partitioned_queue_test" var topicName string = "partitioned_queue_topic_test" - var rebalanceDelay int64 = 10 + var rebalanceDelay int64 = 15 var partitionCount int32 = 3 - Context("queue has three partitions and rebalance delay of 10 seconds", func() { + Context("queue has three partitions and rebalance delay of 15 seconds", func() { BeforeEach(func() { helpers.CreatePartitionedQueue(queueName, partitionCount, rebalanceDelay, topicName) }) @@ -56,6 +56,7 @@ var _ = Describe("Partitioned Queue Tests", func() { It("should have at least one key assigned to each partition and same keyed messages go to same partition", Label(PQLabel), func() { var messagingServices [4]solace.MessagingService var partitionKeys [9]string + var rebalanceDelayDuration = time.Duration(rebalanceDelay) * 2 //generate partition keys for i := 0; i < 9; i++ { @@ -96,7 +97,7 @@ var _ = Describe("Partitioned Queue Tests", func() { publisher.Publish(msg, resource.TopicOf(topicName), nil, nil) } - publisher.Terminate(5 * time.Second) + publisher.Terminate(rebalanceDelayDuration * time.Second) messageHandler := func(message message.InboundMessage) { fmt.Println("message received") @@ -113,21 +114,21 @@ var _ = Describe("Partitioned Queue Tests", func() { Eventually(func() uint64 { return receiverOneMetrics.GetValue(metrics.PersistentMessagesReceived) - }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) + }).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeNumerically(">=", 2)) Eventually(func() uint64 { return receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) - }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) + }).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeNumerically(">=", 2)) Eventually(func() uint64 { return receiverThreeMetrics.GetValue(metrics.PersistentMessagesReceived) - }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) + }).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeNumerically(">=", 2)) Eventually(func() uint64 { totalMessagesReceived := receiverOneMetrics. GetValue(metrics.PersistentMessagesReceived) + receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) + receiverThreeMetrics.GetValue(metrics.PersistentMessagesReceived) return totalMessagesReceived - }).WithTimeout(10 * time.Second).Should(Equal(publisherMetrics.GetValue(metrics.TotalMessagesSent))) + }).WithTimeout(rebalanceDelayDuration * time.Second).Should(Equal(publisherMetrics.GetValue(metrics.TotalMessagesSent))) Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred()) Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred()) @@ -140,6 +141,8 @@ var _ = Describe("Partitioned Queue Tests", func() { var listenerTwo solace.ReceiverStateChangeListener var listenerThree solace.ReceiverStateChangeListener + var rebalanceDelayDuration = time.Duration(rebalanceDelay) * 2 + var messagingServices [3]solace.MessagingService partitionedQueue := resource.QueueDurableNonExclusive(queueName) @@ -187,7 +190,7 @@ var _ = Describe("Partitioned Queue Tests", func() { Expect(receiverTwo.Start()).ToNot(HaveOccurred()) Expect(receiverThree.Start()).ToNot(HaveOccurred()) - time.Sleep(10 * time.Second) + time.Sleep(rebalanceDelayDuration * time.Second) testcontext.SEMP().Config().QueueApi.UpdateMsgVpnQueue( testcontext.SEMP().ConfigCtx(), @@ -199,7 +202,7 @@ var _ = Describe("Partitioned Queue Tests", func() { nil, ) - Eventually(ch).WithTimeout(10 * time.Second).Should(BeClosed()) + Eventually(ch).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeClosed()) Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred()) Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred()) @@ -207,7 +210,7 @@ var _ = Describe("Partitioned Queue Tests", func() { }) It("rebinds to same partition after reconnect within rebalance delay", Label(PQLabel), func() { var connectionRetries uint = 5 - var intervalDurationSec = time.Duration(1) + var intervalDurationSec = time.Duration(2) var reconnectDurationTimoutSec = time.Duration(connectionRetries) * intervalDurationSec * 2 // should be about 10 secs rebalanceDelayDuration := time.Duration(rebalanceDelay) * 2 // should be about 20 secs var messagingServices [3]solace.MessagingService @@ -291,7 +294,8 @@ var _ = Describe("Partitioned Queue Tests", func() { return totalMessagesReceived }).WithTimeout(rebalanceDelayDuration * time.Second).Should(Equal(publisherMetrics.GetValue(metrics.TotalMessagesSent))) - partitionKeysBeforeDisconnect := make([]string, len(receiverOnePartitionKeys)) + numPartitionKeys := len(receiverOnePartitionKeys) + partitionKeysBeforeDisconnect := make([]string, numPartitionKeys) copy(receiverOnePartitionKeys, partitionKeysBeforeDisconnect) receiverOnePartitionKeys = receiverOnePartitionKeys[:0] @@ -318,7 +322,7 @@ var _ = Describe("Partitioned Queue Tests", func() { return totalMessagesReceived }).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeNumerically(">=", publisherMetrics.GetValue(metrics.TotalMessagesSent))) - partitionKeysAfterReconnection := make([]string, len(receiverOnePartitionKeys)) + partitionKeysAfterReconnection := make([]string, numPartitionKeys) copy(receiverOnePartitionKeys, partitionKeysAfterReconnection) Expect(partitionKeysBeforeDisconnect).Should(Equal(partitionKeysAfterReconnection))