From d27d2c42617ef3845831da342e2dcd5d665d6830 Mon Sep 17 00:00:00 2001 From: Clark Bains Date: Thu, 26 Oct 2023 11:41:15 -0400 Subject: [PATCH 01/12] CRE-705: Update to dynamically versioned test infrastructure --- Jenkinsfile | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 7964fde..09d0061 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -22,6 +22,17 @@ currentBuild.rawBuild.getParent().setQuietPeriod(0) library 'jenkins-pipeline-library@main' -stage('Build') { - builder.goapi() -} +builder.goapi([ + "validationGoVer": 'auto-v1.17.x', + "buildCheckGoVer": 'auto-v1.17.x', + "getTestPermutations": { + List> permutations = [] + for (platform in [builder.LINUX_ARM, builder.DARWIN_X86_64, builder.LINUX_X86_64, builder.DARWIN_ARM, builder.LINUX_MUSL]) { + for (gover in ['auto-v1.17.x']) { + permutations << [platform, gover] + } + } + return permutations + } +]) + From 6d729d01ad9597500e7286e1787a6aebe939935f Mon Sep 17 00:00:00 2001 From: Clark Bains Date: Thu, 26 Oct 2023 11:42:14 -0400 Subject: [PATCH 02/12] CRE-705: Point at dev pipeline library --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index 09d0061..0fc3631 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -20,7 +20,7 @@ properties([ ]) currentBuild.rawBuild.getParent().setQuietPeriod(0) -library 'jenkins-pipeline-library@main' +library 'jenkins-pipeline-library@CRE-705' builder.goapi([ "validationGoVer": 'auto-v1.17.x', From 19a44364fb857edf8ed4c611e1671cc9fb9255a3 Mon Sep 17 00:00:00 2001 From: Clark Bains Date: Thu, 26 Oct 2023 13:04:55 -0400 Subject: [PATCH 03/12] Re-arrange Platform order, document go versions --- Jenkinsfile | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 0fc3631..41b14ce 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -22,12 +22,23 @@ currentBuild.rawBuild.getParent().setQuietPeriod(0) library 'jenkins-pipeline-library@CRE-705' +/* + Go Version examples: + auto-v1.17.x: Latest patch of 1.17 release + auto-v1.17.2: Specific patch of 1.17 release + auto-v1.17.0: First release of 1.17 (Despite go versioning this as 1.17) + auto-latest: Most recent patch version of latest minor release + auto-previous: Most recent patch version of previous minor release + auto-2previous: Most recent patch version of second last minor release + + Adoption of new versions into these may be delayed. +*/ builder.goapi([ - "validationGoVer": 'auto-v1.17.x', "buildCheckGoVer": 'auto-v1.17.x', + "validationGoVer": 'auto-v1.17.x', "getTestPermutations": { List> permutations = [] - for (platform in [builder.LINUX_ARM, builder.DARWIN_X86_64, builder.LINUX_X86_64, builder.DARWIN_ARM, builder.LINUX_MUSL]) { + for (platform in [builder.LINUX_ARM, builder.LINUX_X86_64, builder.LINUX_MUSL, builder.DARWIN_X86_64, builder.DARWIN_ARM]) { for (gover in ['auto-v1.17.x']) { permutations << [platform, gover] } From 337429034b3676cbe847794ba58ee4ce2715b922 Mon Sep 17 00:00:00 2001 From: Clark Bains Date: Fri, 27 Oct 2023 11:10:10 -0400 Subject: [PATCH 04/12] Use latest and previous versions in go pipeline --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index 41b14ce..f67f687 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -39,7 +39,7 @@ builder.goapi([ "getTestPermutations": { List> permutations = [] for (platform in [builder.LINUX_ARM, builder.LINUX_X86_64, builder.LINUX_MUSL, builder.DARWIN_X86_64, builder.DARWIN_ARM]) { - for (gover in ['auto-v1.17.x']) { + for (gover in ['auto-latest', 'auto-previous']) { permutations << [platform, gover] } } From a13cfd6527f5bf4ce7b139222fbc2789a87ad7b0 Mon Sep 17 00:00:00 2001 From: Clark Bains Date: Fri, 27 Oct 2023 12:19:52 -0400 Subject: [PATCH 05/12] Trigger rebuild --- Jenkinsfile | 1 + 1 file changed, 1 insertion(+) diff --git a/Jenkinsfile b/Jenkinsfile index f67f687..cf47068 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -33,6 +33,7 @@ library 'jenkins-pipeline-library@CRE-705' Adoption of new versions into these may be delayed. */ + builder.goapi([ "buildCheckGoVer": 'auto-v1.17.x', "validationGoVer": 'auto-v1.17.x', From b44199c675d2cae2daa77fc5a1ea788e6b830986 Mon Sep 17 00:00:00 2001 From: Clark Bains Date: Fri, 27 Oct 2023 13:00:23 -0400 Subject: [PATCH 06/12] Trigger rebuild --- Jenkinsfile | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index cf47068..2dade3a 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -46,5 +46,4 @@ builder.goapi([ } return permutations } -]) - +]) \ No newline at end of file From e6ffcbac418ec26ec54b7e0621aa816640c2f26e Mon Sep 17 00:00:00 2001 From: Clark Bains Date: Fri, 27 Oct 2023 14:28:47 -0400 Subject: [PATCH 07/12] Trigger build --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index 2dade3a..ce7386c 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -46,4 +46,4 @@ builder.goapi([ } return permutations } -]) \ No newline at end of file +]) From 22114951a1d37a38a4ada6c94109434b1972ae90 Mon Sep 17 00:00:00 2001 From: Clark Bains Date: Fri, 27 Oct 2023 15:15:57 -0400 Subject: [PATCH 08/12] Revert to using JPL main --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index ce7386c..aade1bd 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -20,7 +20,7 @@ properties([ ]) currentBuild.rawBuild.getParent().setQuietPeriod(0) -library 'jenkins-pipeline-library@CRE-705' +library 'jenkins-pipeline-library@main' /* Go Version examples: From 3cd00a210f7dd483ec556857b18b09989a0775f4 Mon Sep 17 00:00:00 2001 From: Chris Morgan Date: Fri, 10 Nov 2023 10:23:12 -0500 Subject: [PATCH 09/12] SOL-106167: Updated go version to 1.21.x latest, this should be 1.21.4 as of this commit Signed-off-by: Chris Morgan --- .github/workflows/test.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d7b9556..b165a8a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,7 +17,10 @@ jobs: - name: Setup Go environment uses: actions/setup-go@v2.1.3 with: - go-version: '1.20' + go-version: '1.21' + check-latest: true + - name: Check Go Version + run: go version - name: Compiles run: go build ./... From 611af89ad30607812b41d07b70f9a5af3c974d74 Mon Sep 17 00:00:00 2001 From: Chris Morgan Date: Fri, 10 Nov 2023 10:42:24 -0500 Subject: [PATCH 10/12] SOL-106205: Added gitactions job to check go source compatibility check for go version 1.17.0 Signed-off-by: Chris Morgan --- .github/workflows/test.yml | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b165a8a..2f7011a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -5,7 +5,32 @@ on: [push, pull_request, workflow_dispatch] # A workflow run is made up of one or more jobs that can run sequentially or in parallel jobs: - # This workflow contains a single job called "build" + # This workflow contains multiple jobs + # this job sets up the oldest version of go to check lang compatibility + CompatibilityCheck: + runs-on: ubuntu-20.04 + #Steps for the compatiblity test + steps: + - uses: actions/checkout@v2 + - name: Setup Go Environment + uses: actions/setup-go@v4 + with: + go-version: '1.17.0' + - name: Check Go Version + run: go version + - name: Compiles + run: go build ./... + - name: Runs unit tests + if: ${{ success() }} + run: go test -coverprofile ./unitcoverage.out ./... + - name: Uploads artifacts + if: ${{ always() }} + uses: actions/upload-artifact@v2 + with: + path: | + ./unitcoverage.out + + # this job runs linux based tests Linux: # The type of runner that the job will run on runs-on: ubuntu-20.04 From 63dcf9fb08195c70dc121b449cb63fc0e7ceb932 Mon Sep 17 00:00:00 2001 From: Chris Morgan Date: Wed, 15 Nov 2023 14:08:34 -0500 Subject: [PATCH 11/12] SOL-98567: Adjust rebalance delay for partitioned queue tests Fixed issue were PQ test does nto wait for received messages Signed-off-by: Chris Morgan --- test/helpers/resource_helpers.go | 22 +- test/partitioned_queue_test.go | 605 ++++++++++++++++--------------- 2 files changed, 317 insertions(+), 310 deletions(-) diff --git a/test/helpers/resource_helpers.go b/test/helpers/resource_helpers.go index eaf2d95..8c8d3a9 100644 --- a/test/helpers/resource_helpers.go +++ b/test/helpers/resource_helpers.go @@ -64,17 +64,17 @@ func CreateNonExclusiveQueue(queueName string, topics ...string) { // CreatePartitionedQueue function func CreatePartitionedQueue(queueName string, partitionCount int32, partitionRebalanceDelay int64, topics ...string) { - _, _, err := testcontext.SEMP().Config().QueueApi.CreateMsgVpnQueue(testcontext.SEMP() .ConfigCtx(), sempconfig.MsgVpnQueue{ - QueueName: queueName, - AccessType: "non-exclusive", - Permission: "modify-topic", - IngressEnabled: True, - EgressEnabled: True, - PartitionCount: partitionCount, - PartitionRebalanceDelay: partitionRebalanceDelay, - Owner: "default", - }, testcontext.Messaging().VPN, nil) - ExpectWithOffset(1, err).ToNot(HaveOccurred(), "Failed to create queue with name "+queueName) + _, _, err := testcontext.SEMP().Config().QueueApi.CreateMsgVpnQueue(testcontext.SEMP().ConfigCtx(), sempconfig.MsgVpnQueue{ + QueueName: queueName, + AccessType: "non-exclusive", + Permission: "modify-topic", + IngressEnabled: True, + EgressEnabled: True, + PartitionCount: partitionCount, + PartitionRebalanceDelay: partitionRebalanceDelay, + Owner: "default", + }, testcontext.Messaging().VPN, nil) + ExpectWithOffset(1, err).ToNot(HaveOccurred(), "Failed to create queue with name "+queueName) for _, topic := range topics { _, _, err = testcontext.SEMP().Config().QueueApi.CreateMsgVpnQueueSubscription(testcontext.SEMP().ConfigCtx(), sempconfig.MsgVpnQueueSubscription{ diff --git a/test/partitioned_queue_test.go b/test/partitioned_queue_test.go index 9009560..2b64c21 100644 --- a/test/partitioned_queue_test.go +++ b/test/partitioned_queue_test.go @@ -17,306 +17,313 @@ package test import ( - "time" - "fmt" - "strconv" - - "solace.dev/go/messaging" - "solace.dev/go/messaging/pkg/solace" - "solace.dev/go/messaging/pkg/solace/config" - "solace.dev/go/messaging/pkg/solace/metrics" - "solace.dev/go/messaging/pkg/solace/resource" - //"solace.dev/go/messaging/pkg/solace/subcode" - "solace.dev/go/messaging/test/helpers" - "solace.dev/go/messaging/test/testcontext" - "solace.dev/go/messaging/pkg/solace/message" - - sempconfig "solace.dev/go/messaging/test/sempclient/config" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" + "fmt" + "strconv" + "time" + + "solace.dev/go/messaging" + "solace.dev/go/messaging/pkg/solace" + "solace.dev/go/messaging/pkg/solace/config" + "solace.dev/go/messaging/pkg/solace/metrics" + "solace.dev/go/messaging/pkg/solace/resource" + //"solace.dev/go/messaging/pkg/solace/subcode" + "solace.dev/go/messaging/pkg/solace/message" + "solace.dev/go/messaging/test/helpers" + "solace.dev/go/messaging/test/testcontext" + + sempconfig "solace.dev/go/messaging/test/sempclient/config" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" ) +const PQLabel string = "partition_queue" + var _ = Describe("Partitioned Queue Tests", func() { - var queueName string = "partitioned_queue_test" - var topicName string = "partitioned_queue_topic_test" - var rebalanceDelay int64 = 5 - var partitionCount int32 = 3 - Context("queue has three partitions and rebalance delay of 1 second", func() { - BeforeEach(func() { - helpers.CreatePartitionedQueue(queueName, partitionCount, rebalanceDelay, topicName) - }) - - AfterEach(func() { - helpers.DeleteQueue(queueName) - }) - - It("should have at least one key assigned to each partition and same keyed messages go to same partition", func() { - var messagingServices[4]solace.MessagingService - var partitionKeys[9]string - - //generate partition keys - for i := 0; i < 9; i++{ - partitionKeys[i] = "key_"+ strconv.Itoa(i) - } - - for i := 0; i < 4; i++{ - messagingServices[i] = helpers.BuildMessagingService(messaging.NewMessagingServiceBuilder().FromConfigurationProvider(helpers.DefaultConfiguration())) - helpers.ConnectMessagingService(messagingServices[i]) - } - - defer func() { - for i := 0; i < 4; i++{ - helpers.DisconnectMessagingService(messagingServices[i]) - } - }() - - - partitionedQueue := resource.QueueDurableNonExclusive(queueName) - publisher := helpers.NewPersistentPublisher(messagingServices[0]) - - - receiverOne, _ := messagingServices[1].CreatePersistentMessageReceiverBuilder(). - WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) - - receiverTwo, _ := messagingServices[2].CreatePersistentMessageReceiverBuilder(). - WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) - - receiverThree, _ := messagingServices[3].CreatePersistentMessageReceiverBuilder(). - WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) - - publisher.Start() - receiverOne.Start() - receiverTwo.Start() - receiverThree.Start() - - messageBuilder := messagingServices[0].MessageBuilder() - for i := 0; i < 18; i++{ - msg, _ := messageBuilder.WithProperty(config.MessageProperty(config.QueuePartitionKey), partitionKeys[i % 9]).BuildWithStringPayload("Hi Solace") - publisher.Publish(msg, resource.TopicOf(topicName), nil, nil) - } - - publisher.Terminate(5 * time.Second) - - messageHandler := func(message message.InboundMessage) { - fmt.Println("message received") - } - - receiverOne.ReceiveAsync(messageHandler) - receiverTwo.ReceiveAsync(messageHandler) - receiverThree.ReceiveAsync(messageHandler) - - publisherMetrics := messagingServices[0].Metrics() - receiverOneMetrics := messagingServices[1].Metrics() - receiverTwoMetrics := messagingServices[2].Metrics() - receiverThreeMetrics := messagingServices[3].Metrics() - - Eventually(func() uint64 { - return receiverOneMetrics.GetValue(metrics.PersistentMessagesReceived) - }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) - - Eventually(func() uint64 { - return receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) - }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) - - Eventually(func() uint64 { - return receiverThreeMetrics.GetValue(metrics.PersistentMessagesReceived) - }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) - - Eventually( func() uint64 { - totalMessagesReceived := receiverOneMetrics. - GetValue(metrics.PersistentMessagesReceived) + receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) + receiverThreeMetrics.GetValue(metrics.PersistentMessagesReceived) - return totalMessagesReceived - }).WithTimeout(10 * time.Second).Should(Equal(publisherMetrics.GetValue(metrics.TotalMessagesSent))) - - Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred()) - Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred()) - Expect(receiverThree.Terminate(10 * time.Second)).ToNot(HaveOccurred()) - }) - - It("generates flow inactive event when no partitions left for consumer to bind", func() { - - var listenerOne solace.ReceiverStateChangeListener - var listenerTwo solace.ReceiverStateChangeListener - var listenerThree solace.ReceiverStateChangeListener - - var messagingServices[3]solace.MessagingService - - partitionedQueue := resource.QueueDurableNonExclusive(queueName) - - for i := 0; i < 3; i++{ - messagingServices[i] = helpers.BuildMessagingService(messaging.NewMessagingServiceBuilder(). - FromConfigurationProvider(helpers.DefaultConfiguration())) - helpers.ConnectMessagingService(messagingServices[i]) - } - - defer func() { - for i := 0; i < 3; i++{ - helpers.DisconnectMessagingService(messagingServices[i]) - } - }() - //activeStateTransitions refer to start-up induced state changes - //(i.e., receiver transition from passive to active whereas passive transitions are induced on partition downscale - activeStateTransitions, passiveStateTransitions := 0, 0 - ch := make(chan struct{}) - - passiveTransitionIncrementor := func(oldState, newState solace.ReceiverState, timestamp time.Time) { - if oldState == solace.ReceiverActive && newState == solace.ReceiverPassive { - passiveStateTransitions++ - } else { - activeStateTransitions++ - } - - if passiveStateTransitions == 2 && activeStateTransitions == 3 { - close(ch) - } - } - - listenerOne, listenerTwo, listenerThree = passiveTransitionIncrementor, passiveTransitionIncrementor, passiveTransitionIncrementor - - receiverOne, _ := messagingServices[0].CreatePersistentMessageReceiverBuilder(). - WithSubscriptions(resource.TopicSubscriptionOf(topicName)).WithActivationPassivationSupport(listenerOne).Build(partitionedQueue) - - receiverTwo, _ := messagingServices[1].CreatePersistentMessageReceiverBuilder(). - WithSubscriptions(resource.TopicSubscriptionOf(topicName)).WithActivationPassivationSupport(listenerTwo).Build(partitionedQueue) - - receiverThree, _ := messagingServices[2].CreatePersistentMessageReceiverBuilder(). - WithSubscriptions(resource.TopicSubscriptionOf(topicName)).WithActivationPassivationSupport(listenerThree).Build(partitionedQueue) - - Expect(receiverOne.Start()).ToNot(HaveOccurred()) - Expect(receiverTwo.Start()).ToNot(HaveOccurred()) - Expect(receiverThree.Start()).ToNot(HaveOccurred()) - - time.Sleep(10 * time.Second) - - testcontext.SEMP().Config().QueueApi.UpdateMsgVpnQueue( - testcontext.SEMP().ConfigCtx(), - sempconfig.MsgVpnQueue{ - PartitionCount: 1, - }, - testcontext.Messaging().VPN, - queueName, - nil, - ) - - Eventually(ch).WithTimeout(10 * time.Second).Should(BeClosed()) - - Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred()) - Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred()) - Expect(receiverThree.Terminate(10 * time.Second)).ToNot(HaveOccurred()) - }) - It("rebinds to same partition after reconnect within rebalance delay", func () { - var messagingServices[3]solace.MessagingService - for i := 0; i < 3; i++{ - if i == 2 { - messagingServices[i] = helpers.BuildMessagingService(messaging.NewMessagingServiceBuilder(). - FromConfigurationProvider(helpers.ToxicConfiguration())) - } else { - messagingServices[i] = helpers.BuildMessagingService(messaging.NewMessagingServiceBuilder(). - FromConfigurationProvider(helpers.DefaultConfiguration())) - } - - helpers.ConnectMessagingService(messagingServices[i]) - } - - defer func() { - for i := 0; i < 3; i++{ - helpers.DisconnectMessagingService(messagingServices[i]) - } - }() - - var partitionKeys[9]string - for i := 0; i < 9; i++{ - partitionKeys[i] = "key_"+ strconv.Itoa(i) - } - - publisher := helpers.NewPersistentPublisher(messagingServices[0]) - messageBuilder := messagingServices[0].MessageBuilder() - publisher.Start() - - publisherMetrics := messagingServices[0].Metrics() - publishMessages := func (firstConnectionAttempt bool) { - for i := 0; i < 18; i++{ - msg, _ := messageBuilder.WithProperty(config.MessageProperty(config.QueuePartitionKey), partitionKeys[i % 9]).BuildWithStringPayload("Hi Solace") - publisher.Publish(msg, resource.TopicOf(topicName), nil, nil) - } - - if firstConnectionAttempt { - Eventually(func() uint64 { - return publisherMetrics.GetValue(metrics.TotalMessagesSent) - }).WithTimeout(30 * time.Second).Should(BeNumerically("==", 18)) - } else { - Eventually(func() uint64 { - return publisherMetrics.GetValue(metrics.TotalMessagesSent) - }).WithTimeout(30 * time.Second).Should(BeNumerically("==", 36)) - } - } - - publishMessages(true) - - partitionedQueue := resource.QueueDurableNonExclusive(queueName) - receiverOne, _ := messagingServices[1].CreatePersistentMessageReceiverBuilder(). - WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) - receiverOne.Start() - receiverOnePartitionKeys := make([]string, 0, 18) - receiverOneMessageHandler := func (message message.InboundMessage) { - partitionKey, _ := message.GetProperty("JMSXGroupID") - partitionKeyValue := fmt.Sprint(partitionKey) - receiverOnePartitionKeys = append(receiverOnePartitionKeys, partitionKeyValue) - } - - receiverTwo, _ := messagingServices[2].CreatePersistentMessageReceiverBuilder(). - WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) - receiverTwo.Start() - receiverTwoMessageHandler := func (message message.InboundMessage){ - fmt.Println("Received message in receiverTwo") - } - - receiverOne.ReceiveAsync(receiverOneMessageHandler) - receiverTwo.ReceiveAsync(receiverTwoMessageHandler) - - receiverOneMetrics := messagingServices[1].Metrics() - receiverTwoMetrics := messagingServices[2].Metrics() - - Eventually( func() uint64 { - totalMessagesReceived := receiverOneMetrics.GetValue(metrics.PersistentMessagesReceived) + receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) - return totalMessagesReceived - }).WithTimeout(30 * time.Second).Should(Equal(publisherMetrics.GetValue(metrics.TotalMessagesSent))) - - partitionKeysBeforeDisconnect := make([]string, len(receiverOnePartitionKeys)) - copy(receiverOnePartitionKeys, partitionKeysBeforeDisconnect) - receiverOnePartitionKeys = receiverOnePartitionKeys[:0] - - reconnectionListenerChan := make(chan struct{}) - messagingServices[2].AddReconnectionListener(func(even solace.ServiceEvent) { - close(reconnectionListenerChan) - }) - - reconnectAttemptListenerChan := make(chan struct{}) - messagingServices[2].AddReconnectionAttemptListener(func(event solace.ServiceEvent) { - testcontext.Toxi().SMF().Enable() - close(reconnectAttemptListenerChan) - }) - - //temporarily disconnect receiverTwo - testcontext.Toxi().SMF().Disable() - - Eventually(reconnectionListenerChan).WithTimeout(30 * time.Second).Should(BeClosed()) - - //republish messages - publishMessages(false) - - Eventually( func() uint64 { - totalMessagesReceived := receiverOneMetrics.GetValue(metrics.PersistentMessagesReceived) + receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) - return totalMessagesReceived - }).WithTimeout(30 * time.Second).Should(BeNumerically(">=", publisherMetrics.GetValue(metrics.TotalMessagesSent))) - - partitionKeysAfterReconnection := make([]string, len(receiverOnePartitionKeys)) - copy(receiverOnePartitionKeys, partitionKeysAfterReconnection) - - Expect(partitionKeysBeforeDisconnect).Should(Equal(partitionKeysAfterReconnection)) - Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred()) - Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred()) - }) - }) + var queueName string = "partitioned_queue_test" + var topicName string = "partitioned_queue_topic_test" + var rebalanceDelay int64 = 10 + var partitionCount int32 = 3 + Context("queue has three partitions and rebalance delay of 10 seconds", func() { + BeforeEach(func() { + helpers.CreatePartitionedQueue(queueName, partitionCount, rebalanceDelay, topicName) + }) + + AfterEach(func() { + helpers.DeleteQueue(queueName) + }) + + It("should have at least one key assigned to each partition and same keyed messages go to same partition", Label(PQLabel), func() { + var messagingServices [4]solace.MessagingService + var partitionKeys [9]string + + //generate partition keys + for i := 0; i < 9; i++ { + partitionKeys[i] = "key_" + strconv.Itoa(i) + } + + for i := 0; i < 4; i++ { + messagingServices[i] = helpers.BuildMessagingService(messaging.NewMessagingServiceBuilder().FromConfigurationProvider(helpers.DefaultConfiguration())) + helpers.ConnectMessagingService(messagingServices[i]) + } + + defer func() { + for i := 0; i < 4; i++ { + helpers.DisconnectMessagingService(messagingServices[i]) + } + }() + + partitionedQueue := resource.QueueDurableNonExclusive(queueName) + publisher := helpers.NewPersistentPublisher(messagingServices[0]) + + receiverOne, _ := messagingServices[1].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) + + receiverTwo, _ := messagingServices[2].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) + + receiverThree, _ := messagingServices[3].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) + + publisher.Start() + receiverOne.Start() + receiverTwo.Start() + receiverThree.Start() + + messageBuilder := messagingServices[0].MessageBuilder() + for i := 0; i < 18; i++ { + msg, _ := messageBuilder.WithProperty(config.MessageProperty(config.QueuePartitionKey), partitionKeys[i%9]).BuildWithStringPayload("Hi Solace") + publisher.Publish(msg, resource.TopicOf(topicName), nil, nil) + } + + publisher.Terminate(5 * time.Second) + + messageHandler := func(message message.InboundMessage) { + fmt.Println("message received") + } + + receiverOne.ReceiveAsync(messageHandler) + receiverTwo.ReceiveAsync(messageHandler) + receiverThree.ReceiveAsync(messageHandler) + + publisherMetrics := messagingServices[0].Metrics() + receiverOneMetrics := messagingServices[1].Metrics() + receiverTwoMetrics := messagingServices[2].Metrics() + receiverThreeMetrics := messagingServices[3].Metrics() + + Eventually(func() uint64 { + return receiverOneMetrics.GetValue(metrics.PersistentMessagesReceived) + }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) + + Eventually(func() uint64 { + return receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) + }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) + + Eventually(func() uint64 { + return receiverThreeMetrics.GetValue(metrics.PersistentMessagesReceived) + }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) + + Eventually(func() uint64 { + totalMessagesReceived := receiverOneMetrics. + GetValue(metrics.PersistentMessagesReceived) + receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) + receiverThreeMetrics.GetValue(metrics.PersistentMessagesReceived) + return totalMessagesReceived + }).WithTimeout(10 * time.Second).Should(Equal(publisherMetrics.GetValue(metrics.TotalMessagesSent))) + + Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + Expect(receiverThree.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + }) + + It("generates flow inactive event when no partitions left for consumer to bind", Label(PQLabel), func() { + + var listenerOne solace.ReceiverStateChangeListener + var listenerTwo solace.ReceiverStateChangeListener + var listenerThree solace.ReceiverStateChangeListener + + var messagingServices [3]solace.MessagingService + + partitionedQueue := resource.QueueDurableNonExclusive(queueName) + + for i := 0; i < 3; i++ { + messagingServices[i] = helpers.BuildMessagingService(messaging.NewMessagingServiceBuilder(). + FromConfigurationProvider(helpers.DefaultConfiguration())) + helpers.ConnectMessagingService(messagingServices[i]) + } + + defer func() { + for i := 0; i < 3; i++ { + helpers.DisconnectMessagingService(messagingServices[i]) + } + }() + //activeStateTransitions refer to start-up induced state changes + //(i.e., receiver transition from passive to active whereas passive transitions are induced on partition downscale + activeStateTransitions, passiveStateTransitions := 0, 0 + ch := make(chan struct{}) + + passiveTransitionIncrementor := func(oldState, newState solace.ReceiverState, timestamp time.Time) { + if oldState == solace.ReceiverActive && newState == solace.ReceiverPassive { + passiveStateTransitions++ + } else { + activeStateTransitions++ + } + + if passiveStateTransitions == 2 && activeStateTransitions == 3 { + close(ch) + } + } + + listenerOne, listenerTwo, listenerThree = passiveTransitionIncrementor, passiveTransitionIncrementor, passiveTransitionIncrementor + + receiverOne, _ := messagingServices[0].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).WithActivationPassivationSupport(listenerOne).Build(partitionedQueue) + + receiverTwo, _ := messagingServices[1].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).WithActivationPassivationSupport(listenerTwo).Build(partitionedQueue) + + receiverThree, _ := messagingServices[2].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).WithActivationPassivationSupport(listenerThree).Build(partitionedQueue) + + Expect(receiverOne.Start()).ToNot(HaveOccurred()) + Expect(receiverTwo.Start()).ToNot(HaveOccurred()) + Expect(receiverThree.Start()).ToNot(HaveOccurred()) + + time.Sleep(10 * time.Second) + + testcontext.SEMP().Config().QueueApi.UpdateMsgVpnQueue( + testcontext.SEMP().ConfigCtx(), + sempconfig.MsgVpnQueue{ + PartitionCount: 1, + }, + testcontext.Messaging().VPN, + queueName, + nil, + ) + + Eventually(ch).WithTimeout(10 * time.Second).Should(BeClosed()) + + Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + Expect(receiverThree.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + }) + It("rebinds to same partition after reconnect within rebalance delay", Label(PQLabel), func() { + var connectionRetries uint = 5 + var intervalDurationSec = time.Duration(1) + var reconnectDurationTimoutSec = time.Duration(connectionRetries) * intervalDurationSec * 2 // should be about 10 secs + rebalanceDelayDuration := time.Duration(rebalanceDelay) * 2 // should be about 20 secs + var messagingServices [3]solace.MessagingService + for i := 0; i < 3; i++ { + messagingServices[i] = helpers.BuildMessagingService(messaging.NewMessagingServiceBuilder(). + FromConfigurationProvider(helpers.DefaultConfiguration()). + WithReconnectionRetryStrategy(config. + RetryStrategyParameterizedRetry(connectionRetries, intervalDurationSec*time.Second))) + + helpers.ConnectMessagingService(messagingServices[i]) + } + + defer func() { + for i := 0; i < 3; i++ { + helpers.DisconnectMessagingService(messagingServices[i]) + } + }() + + var partitionKeys [9]string + for i := 0; i < 9; i++ { + partitionKeys[i] = "key_" + strconv.Itoa(i) + } + + publisher := helpers.NewPersistentPublisher(messagingServices[0]) + messageBuilder := messagingServices[0].MessageBuilder() + publisher.Start() + + publisherMetrics := messagingServices[0].Metrics() + publishMessages := func(firstConnectionAttempt bool) { + for i := 0; i < 18; i++ { + msg, _ := messageBuilder.WithProperty(config.MessageProperty(config.QueuePartitionKey), partitionKeys[i%9]).BuildWithStringPayload("Hi Solace") + publisher.Publish(msg, resource.TopicOf(topicName), nil, nil) + } + + if firstConnectionAttempt { + Eventually(func() uint64 { + return publisherMetrics.GetValue(metrics.TotalMessagesSent) + }).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeNumerically("==", 18)) + } else { + Eventually(func() uint64 { + return publisherMetrics.GetValue(metrics.TotalMessagesSent) + }).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeNumerically("==", 36)) + } + } + + publishMessages(true) + + partitionedQueue := resource.QueueDurableNonExclusive(queueName) + receiverOne, _ := messagingServices[1].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) + var receiverOneMessageCount uint64 = 0 + var receiverTwoMessageCount uint64 = 0 + + receiverOnePartitionKeys := make([]string, 0, 18) + receiverOneMessageHandler := func(message message.InboundMessage) { + // get user property "JMSXGroupID" for partition key + if partitionKey, present := message.GetProperty(config.QueuePartitionKey); present { + partitionKeyValue := partitionKey.(string) + receiverOnePartitionKeys = append(receiverOnePartitionKeys, partitionKeyValue) + } + // must count dispatched message after PKey is recorded to avoid racing with main go routine + receiverOneMessageCount += 1 + } + + receiverTwo, _ := messagingServices[2].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) + receiverTwoMessageHandler := func(message message.InboundMessage) { + //fmt.Println("Received message in receiverTwo") + // count the received messages dispatched for the receiver + receiverTwoMessageCount += 1 + } + + receiverOne.ReceiveAsync(receiverOneMessageHandler) + receiverTwo.ReceiveAsync(receiverTwoMessageHandler) + + receiverOne.Start() + receiverTwo.Start() + + Eventually(func() uint64 { + totalMessagesReceived := receiverOneMessageCount + receiverTwoMessageCount + return totalMessagesReceived + }).WithTimeout(rebalanceDelayDuration * time.Second).Should(Equal(publisherMetrics.GetValue(metrics.TotalMessagesSent))) + + partitionKeysBeforeDisconnect := make([]string, len(receiverOnePartitionKeys)) + copy(receiverOnePartitionKeys, partitionKeysBeforeDisconnect) + receiverOnePartitionKeys = receiverOnePartitionKeys[:0] + + reconnectionListenerChan := make(chan struct{}) + messagingServices[2].AddReconnectionListener(func(even solace.ServiceEvent) { + close(reconnectionListenerChan) + }) + + reconnectAttemptListenerChan := make(chan struct{}) + messagingServices[2].AddReconnectionAttemptListener(func(event solace.ServiceEvent) { + close(reconnectAttemptListenerChan) + }) + + //temporarily disconnect receiverTwo + helpers.ForceDisconnectViaSEMPv2(messagingServices[2]) + + Eventually(reconnectionListenerChan).WithTimeout(reconnectDurationTimoutSec * time.Second).Should(BeClosed()) + + //republish messages + publishMessages(false) + + Eventually(func() uint64 { + totalMessagesReceived := receiverOneMessageCount + receiverTwoMessageCount + return totalMessagesReceived + }).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeNumerically(">=", publisherMetrics.GetValue(metrics.TotalMessagesSent))) + + partitionKeysAfterReconnection := make([]string, len(receiverOnePartitionKeys)) + copy(receiverOnePartitionKeys, partitionKeysAfterReconnection) + + Expect(partitionKeysBeforeDisconnect).Should(Equal(partitionKeysAfterReconnection)) + Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + }) + }) }) From 8db7443eb802e4de7ea9228b254b7b2f0f9f1d13 Mon Sep 17 00:00:00 2001 From: Chris Morgan Date: Fri, 17 Nov 2023 10:13:27 -0500 Subject: [PATCH 12/12] SOL-98567: Tweaked rebalance delay for partition queue to be 15secs Signed-off-by: Chris Morgan --- test/partitioned_queue_test.go | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/test/partitioned_queue_test.go b/test/partitioned_queue_test.go index 2b64c21..1f57630 100644 --- a/test/partitioned_queue_test.go +++ b/test/partitioned_queue_test.go @@ -42,9 +42,9 @@ const PQLabel string = "partition_queue" var _ = Describe("Partitioned Queue Tests", func() { var queueName string = "partitioned_queue_test" var topicName string = "partitioned_queue_topic_test" - var rebalanceDelay int64 = 10 + var rebalanceDelay int64 = 15 var partitionCount int32 = 3 - Context("queue has three partitions and rebalance delay of 10 seconds", func() { + Context("queue has three partitions and rebalance delay of 15 seconds", func() { BeforeEach(func() { helpers.CreatePartitionedQueue(queueName, partitionCount, rebalanceDelay, topicName) }) @@ -56,6 +56,7 @@ var _ = Describe("Partitioned Queue Tests", func() { It("should have at least one key assigned to each partition and same keyed messages go to same partition", Label(PQLabel), func() { var messagingServices [4]solace.MessagingService var partitionKeys [9]string + var rebalanceDelayDuration = time.Duration(rebalanceDelay) * 2 //generate partition keys for i := 0; i < 9; i++ { @@ -96,7 +97,7 @@ var _ = Describe("Partitioned Queue Tests", func() { publisher.Publish(msg, resource.TopicOf(topicName), nil, nil) } - publisher.Terminate(5 * time.Second) + publisher.Terminate(rebalanceDelayDuration * time.Second) messageHandler := func(message message.InboundMessage) { fmt.Println("message received") @@ -113,21 +114,21 @@ var _ = Describe("Partitioned Queue Tests", func() { Eventually(func() uint64 { return receiverOneMetrics.GetValue(metrics.PersistentMessagesReceived) - }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) + }).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeNumerically(">=", 2)) Eventually(func() uint64 { return receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) - }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) + }).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeNumerically(">=", 2)) Eventually(func() uint64 { return receiverThreeMetrics.GetValue(metrics.PersistentMessagesReceived) - }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) + }).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeNumerically(">=", 2)) Eventually(func() uint64 { totalMessagesReceived := receiverOneMetrics. GetValue(metrics.PersistentMessagesReceived) + receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) + receiverThreeMetrics.GetValue(metrics.PersistentMessagesReceived) return totalMessagesReceived - }).WithTimeout(10 * time.Second).Should(Equal(publisherMetrics.GetValue(metrics.TotalMessagesSent))) + }).WithTimeout(rebalanceDelayDuration * time.Second).Should(Equal(publisherMetrics.GetValue(metrics.TotalMessagesSent))) Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred()) Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred()) @@ -140,6 +141,8 @@ var _ = Describe("Partitioned Queue Tests", func() { var listenerTwo solace.ReceiverStateChangeListener var listenerThree solace.ReceiverStateChangeListener + var rebalanceDelayDuration = time.Duration(rebalanceDelay) * 2 + var messagingServices [3]solace.MessagingService partitionedQueue := resource.QueueDurableNonExclusive(queueName) @@ -187,7 +190,7 @@ var _ = Describe("Partitioned Queue Tests", func() { Expect(receiverTwo.Start()).ToNot(HaveOccurred()) Expect(receiverThree.Start()).ToNot(HaveOccurred()) - time.Sleep(10 * time.Second) + time.Sleep(rebalanceDelayDuration * time.Second) testcontext.SEMP().Config().QueueApi.UpdateMsgVpnQueue( testcontext.SEMP().ConfigCtx(), @@ -199,7 +202,7 @@ var _ = Describe("Partitioned Queue Tests", func() { nil, ) - Eventually(ch).WithTimeout(10 * time.Second).Should(BeClosed()) + Eventually(ch).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeClosed()) Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred()) Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred()) @@ -207,7 +210,7 @@ var _ = Describe("Partitioned Queue Tests", func() { }) It("rebinds to same partition after reconnect within rebalance delay", Label(PQLabel), func() { var connectionRetries uint = 5 - var intervalDurationSec = time.Duration(1) + var intervalDurationSec = time.Duration(2) var reconnectDurationTimoutSec = time.Duration(connectionRetries) * intervalDurationSec * 2 // should be about 10 secs rebalanceDelayDuration := time.Duration(rebalanceDelay) * 2 // should be about 20 secs var messagingServices [3]solace.MessagingService @@ -291,7 +294,8 @@ var _ = Describe("Partitioned Queue Tests", func() { return totalMessagesReceived }).WithTimeout(rebalanceDelayDuration * time.Second).Should(Equal(publisherMetrics.GetValue(metrics.TotalMessagesSent))) - partitionKeysBeforeDisconnect := make([]string, len(receiverOnePartitionKeys)) + numPartitionKeys := len(receiverOnePartitionKeys) + partitionKeysBeforeDisconnect := make([]string, numPartitionKeys) copy(receiverOnePartitionKeys, partitionKeysBeforeDisconnect) receiverOnePartitionKeys = receiverOnePartitionKeys[:0] @@ -318,7 +322,7 @@ var _ = Describe("Partitioned Queue Tests", func() { return totalMessagesReceived }).WithTimeout(rebalanceDelayDuration * time.Second).Should(BeNumerically(">=", publisherMetrics.GetValue(metrics.TotalMessagesSent))) - partitionKeysAfterReconnection := make([]string, len(receiverOnePartitionKeys)) + partitionKeysAfterReconnection := make([]string, numPartitionKeys) copy(receiverOnePartitionKeys, partitionKeysAfterReconnection) Expect(partitionKeysBeforeDisconnect).Should(Equal(partitionKeysAfterReconnection))