diff --git a/README.md b/README.md index c647639..37cd1bc 100644 --- a/README.md +++ b/README.md @@ -19,8 +19,10 @@ To get started using the Solace PubSub+ API for Go, simply include it as a requi The Solace PubSub+ API for Go is a wrapper around the high performance Solace C API via Cgo and has support for the following operating systems: - Linux (x86/x86_64) variants with Linux 2.6 or later (compatible with glibc (desktop/server) and musl-c (Alpine Linux)) +- Linux (arm64) variants compatible with glibc (desktop/server) - Windows WSL 2.0 - macOS 10.15 and later (x86_64 versions) +- macOS 11.0 and later (arm64 versions) ## Contributing diff --git a/internal/ccsmp/dummy.go b/internal/ccsmp/dummy.go index 06dd689..b032b21 100644 --- a/internal/ccsmp/dummy.go +++ b/internal/ccsmp/dummy.go @@ -25,5 +25,6 @@ package ccsmp import ( _ "solace.dev/go/messaging/internal/ccsmp/lib/darwin" _ "solace.dev/go/messaging/internal/ccsmp/lib/include/solclient" - _ "solace.dev/go/messaging/internal/ccsmp/lib/linux" + _ "solace.dev/go/messaging/internal/ccsmp/lib/linux_amd64" + _ "solace.dev/go/messaging/internal/ccsmp/lib/linux_arm64" ) diff --git a/internal/ccsmp/includes_darwin_amd64.go b/internal/ccsmp/includes_darwin_amd64.go index aae7daa..b6af143 100644 --- a/internal/ccsmp/includes_darwin_amd64.go +++ b/internal/ccsmp/includes_darwin_amd64.go @@ -22,5 +22,3 @@ package ccsmp #cgo LDFLAGS: -L/usr/local/opt/openssl@1.1/lib ${SRCDIR}/lib/darwin/libsolclient.a -lssl -lcrypto -framework Kerberos */ import "C" - -var SolClientRunningOnAlpine bool = false diff --git a/internal/ccsmp/includes_darwin_arm64.go b/internal/ccsmp/includes_darwin_arm64.go index 76657db..309bfed 100644 --- a/internal/ccsmp/includes_darwin_arm64.go +++ b/internal/ccsmp/includes_darwin_arm64.go @@ -22,5 +22,3 @@ package ccsmp #cgo LDFLAGS: -L/opt/homebrew/opt/openssl@1.1/lib ${SRCDIR}/lib/darwin/libsolclient.a -lssl -lcrypto -framework Kerberos */ import "C" - -var SolClientRunningOnAlpine bool = false diff --git a/internal/ccsmp/includes_linux.go b/internal/ccsmp/includes_linux_amd64.go similarity index 81% rename from internal/ccsmp/includes_linux.go rename to internal/ccsmp/includes_linux_amd64.go index a4a1624..87f931d 100644 --- a/internal/ccsmp/includes_linux.go +++ b/internal/ccsmp/includes_linux_amd64.go @@ -19,16 +19,10 @@ package ccsmp /* // specific flags for linux static builds in C #cgo CFLAGS: -I${SRCDIR}/lib/include -#cgo LDFLAGS: ${SRCDIR}/lib/linux/libsolclient.a -lm -ldl -lpthread -lrt +#cgo LDFLAGS: ${SRCDIR}/lib/linux_amd64/libsolclient.a -lm -ldl -lpthread -lrt #include -#include -#ifdef __GLIBC__ -#define SOLCLIENT_USING_MUSL 0 -#else -#define SOLCLIENT_USING_MUSL 1 -#endif */ import "C" @@ -38,5 +32,3 @@ import "C" func funcToLinkAgainstLibdl() { C.dlerror() } - -var SolClientRunningOnAlpine bool = C.SOLCLIENT_USING_MUSL == 1 diff --git a/internal/ccsmp/includes_linux_arm64.go b/internal/ccsmp/includes_linux_arm64.go new file mode 100644 index 0000000..c5ae4cf --- /dev/null +++ b/internal/ccsmp/includes_linux_arm64.go @@ -0,0 +1,34 @@ +// pubsubplus-go-client +// +// Copyright 2021-2023 Solace Corporation. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ccsmp + +/* +// specific flags for linux static builds in C +#cgo CFLAGS: -I${SRCDIR}/lib/include +#cgo LDFLAGS: ${SRCDIR}/lib/linux_arm64/libsolclient.a -lm -ldl -lpthread -lrt + +#include + +*/ +import "C" + +// Because of some strange behaviour of CCSMP, libdl is not actually required as far as the compiler is concerned. +// As a result, we must write a function that allows us to link against the dynamic libdl system library. This +// function should NOT be used as it does nothing and may cause issues. +func funcToLinkAgainstLibdl() { + C.dlerror() +} diff --git a/internal/ccsmp/lib/darwin/libsolclient.a b/internal/ccsmp/lib/darwin/libsolclient.a index f5fdaf1..ec601c2 100644 Binary files a/internal/ccsmp/lib/darwin/libsolclient.a and b/internal/ccsmp/lib/darwin/libsolclient.a differ diff --git a/internal/ccsmp/lib/include/solclient/solClient.h b/internal/ccsmp/lib/include/solclient/solClient.h index ebaa2fa..a1dcf1a 100644 --- a/internal/ccsmp/lib/include/solclient/solClient.h +++ b/internal/ccsmp/lib/include/solclient/solClient.h @@ -27,6 +27,7 @@ extern "C" { #endif /* _cplusplus */ + /** @mainpage @@ -1101,7 +1102,7 @@ typedef struct solClient_field { * * SOLCLIENT_SUBCODE_TE_SHUTDOWN * An attempt was made to operate on a shutdown Guaranteed Delivery Topic Endpoint. -* 503 Durable Topic Endpoint Shutdown, 503 TE Shutdown, 503 Endpoint Shutdown +* 503 Durable Topic Endpoint Shutdown, 503 TE Shutdown * * * SOLCLIENT_SUBCODE_NO_MORE_NON_DURABLE_QUEUE_OR_TE @@ -1543,6 +1544,16 @@ typedef struct solClient_field { * Egress selectors are not permitted when binding to a Partitioned Queue. * 403 Selectors Not Supported on Partititoned Queue * +* +* SOLCLIENT_SUBCODE_SYNC_REPLICATION_INELIGIBLE +* A guaranteed message was rejected because the broker has been configured to reject messages when sync replication mode is ineligible. A transaction commit failed because replication became ineligible during the transaction. +* 503 Sync Replication Ineligible +* +* +* SOLCLIENT_SUBCODE_ENDPOINT_SHUTDOWN +* The client has attempted to publish to a topic that matched a queue or topic endpoint subscription which has its ingress flow shutdown. +* 503 Endpoint Shutdown +* * */ typedef enum solClient_subCode @@ -1711,7 +1722,9 @@ typedef struct solClient_field { SOLCLIENT_SUBCODE_MESSAGE_ID_NOT_COMPARABLE = 157, /**< Replication Group Message Id are not comparable. Messages must be published to the same broker or HA pair for their Replicaton Group Message Id to be comparable. */ SOLCLIENT_SUBCODE_REPLAY_ANONYMOUS_NOT_SUPPORTED = 158, /**< The client attempted to start replay on a flow bound to an anonymous queue. */ SOLCLIENT_SUBCODE_BROWSING_NOT_SUPPORTED_ON_PARTITIONED_QUEUE = 159, /**< Browser flows to Partitioned Queues are not permitted. */ - SOLCLIENT_SUBCODE_SELECTORS_NOT_SUPPORTED_ON_PARTITIONED_QUEUE = 160 /**< Egress selectors are not permitted when binding to a Partitioned Queue. */ + SOLCLIENT_SUBCODE_SELECTORS_NOT_SUPPORTED_ON_PARTITIONED_QUEUE = 160, /**< Egress selectors are not permitted when binding to a Partitioned Queue. */ + SOLCLIENT_SUBCODE_SYNC_REPLICATION_INELIGIBLE = 161, /**< A guaranteed message was rejected because the broker has been configured to reject messages when sync replication mode is ineligible. A transaction commit failed because replication became ineligible during the transaction. */ + SOLCLIENT_SUBCODE_ENDPOINT_SHUTDOWN = 162, /**< The client has attempted to publish to a topic that matched a queue or topic endpoint subscription which has its ingress flow shutdown. */ /* * ADDING NEW SUBCODES: When adding a new subcode always add a new entry to the HTML table in * the comment above this enumeration @@ -2575,6 +2588,7 @@ Note: This property is used for all entries specified by the property ::SOLCLIEN #define SOLCLIENT_SESSION_CAPABILITY_LONG_SELECTORS "SESSION_CAPABILITY_LONG_SELECTORS" /**< Boolean - The peer can support selectors longer than 1023 bytes */ #define SOLCLIENT_SESSION_CAPABILITY_SHARED_SUBSCRIPTIONS "SESSION_CAPABILITY_SHARED_SUBSCRIPTIONS" /**< Boolean - The peer can support \#shared and \#noexport subscriptions */ #define SOLCLIENT_SESSION_CAPABILITY_BR_REPLAY_ERRORID "SESSION_CAPABILITY_BR_REPLAY_ERRORID" /**< Boolean - The peer can support the endpoint error id parameter on the flow bind response during message replay */ +#define SOLCLIENT_SESSION_CAPABILITY_VAR_LEN_EXT_PARAM "SESSION_CAPABILITY_VAR_LEN_EXT_PARAM" /**< Boolean - The peer can support variable length extended parameters. */ #define SOLCLIENT_SESSION_CAPABILITY_ADCTRL_VERSION_MIN "SESSION_CAPABILITY_ADCTRL_VERSION_MIN" /**< Uint32 - Lowest AdCtrl version supported by the broker. */ #define SOLCLIENT_SESSION_CAPABILITY_ADCTRL_VERSION_MAX "SESSION_CAPABILITY_ADCTRL_VERSION_MAX" /**< Uint32 - Highest AdCtrl version supported by the broker. */ /*@}*/ @@ -5603,6 +5617,8 @@ solClient_dllExport solClient_returnCode_t solClient_flow_getTransactedSession(solClient_opaqueFlow_pt flow_p, solClient_opaqueTransactedSession_pt *transactedSession_p); + + #ifndef SOLCLIENT_EXCLUDE_DEPRECATED #include "solClientDeprecated.h" #endif /* SOLCLIENT_EXCLUDE_DEPRECATED */ diff --git a/internal/ccsmp/lib/linux/dummy.go b/internal/ccsmp/lib/linux_amd64/dummy.go similarity index 86% rename from internal/ccsmp/lib/linux/dummy.go rename to internal/ccsmp/lib/linux_amd64/dummy.go index 7642794..8b58163 100644 --- a/internal/ccsmp/lib/linux/dummy.go +++ b/internal/ccsmp/lib/linux_amd64/dummy.go @@ -17,6 +17,6 @@ //go:build dummy // +build dummy -// Package linux is provided as a workaround for go vendoring and contains no go code. +// Package linux_amd64 is provided as a workaround for go vendoring and contains no go code. // See internal/ccsmp/dummy.go for more information. -package linux +package linux_amd64 diff --git a/internal/ccsmp/lib/linux/libsolclient.a b/internal/ccsmp/lib/linux_amd64/libsolclient.a similarity index 65% rename from internal/ccsmp/lib/linux/libsolclient.a rename to internal/ccsmp/lib/linux_amd64/libsolclient.a index 6a082dc..d20ecb1 100644 Binary files a/internal/ccsmp/lib/linux/libsolclient.a and b/internal/ccsmp/lib/linux_amd64/libsolclient.a differ diff --git a/internal/ccsmp/lib/linux_arm64/dummy.go b/internal/ccsmp/lib/linux_arm64/dummy.go new file mode 100644 index 0000000..7188dd8 --- /dev/null +++ b/internal/ccsmp/lib/linux_arm64/dummy.go @@ -0,0 +1,22 @@ +// pubsubplus-go-client +// +// Copyright 2021-2023 Solace Corporation. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build dummy +// +build dummy + +// Package linux_arm64 is provided as a workaround for go vendoring and contains no go code. +// See internal/ccsmp/dummy.go for more information. +package linux_arm64 diff --git a/internal/ccsmp/lib/linux_arm64/libsolclient.a b/internal/ccsmp/lib/linux_arm64/libsolclient.a new file mode 100755 index 0000000..9f116b1 Binary files /dev/null and b/internal/ccsmp/lib/linux_arm64/libsolclient.a differ diff --git a/internal/impl/core/init.go b/internal/impl/core/init.go index 081df61..be1af2f 100644 --- a/internal/impl/core/init.go +++ b/internal/impl/core/init.go @@ -42,11 +42,6 @@ var libraryEnvironmentMapping = map[string]string{ // ccsmp initialization, calls solClient_initialize func init() { propertyMap := make(map[string]string) - if ccsmp.SolClientRunningOnAlpine { - // SOL-78608 we need to use the full library names to pick up the default alpine SSL libs - propertyMap[ccsmp.SolClientGlobalPropCryptoLib] = "libcrypto.so.1.1" - propertyMap[ccsmp.SolClientGlobalPropSslLib] = "libssl.so.1.1" - } for _, env := range supportedKeys { if val, ok := os.LookupEnv(env); ok { var key string diff --git a/pkg/solace/subcode/subcode_generated.go b/pkg/solace/subcode/subcode_generated.go index b7f3c33..54c26bf 100644 --- a/pkg/solace/subcode/subcode_generated.go +++ b/pkg/solace/subcode/subcode_generated.go @@ -340,4 +340,8 @@ const ( BrowsingNotSupportedOnPartitionedQueue Code = 159 // SelectorsNotSupportedOnPartitionedQueue: Egress selectors are not permitted when binding to a Partitioned Queue. SelectorsNotSupportedOnPartitionedQueue Code = 160 + // SyncReplicationIneligible: A guaranteed message was rejected because the broker has been configured to reject messages when sync replication mode is ineligible. A transaction commit failed because replication became ineligible during the transaction. + SyncReplicationIneligible Code = 161 + // EndpointShutdown: The client has attempted to publish to a topic that matched a queue or topic endpoint subscription which has its ingress flow shutdown. + EndpointShutdown Code = 162 ) diff --git a/test/README.md b/test/README.md index 6f840a9..b0a46e5 100644 --- a/test/README.md +++ b/test/README.md @@ -11,7 +11,7 @@ The integration tests are stored in this directory. These integration tests are ## Running the tests -First, docker and docker-compose must be installed and accessible to the current user. Second, `go generate` must be run from the `./test/sempclient` directory. The tests can be run by navigating to the test directory and running `go test`. This will start by spinning up a docker container containing the broker and then running the tests on that container. To run against an external broker, `go test -tags remote` can be used to instruct the tests to target an environment variable based broker. See the [Environment Variables](#environment-variables) section below for more information. +First, docker and docker-compose must be installed and accessible to the current user. Second, `go generate` must be run from the `./test/sempclient` directory. The tests can be run by navigating to the test directory and running `ginkgo` or 'go test'. 'ginkgo' is preferable as the pre-configured timeout (for the whole test suite) is 1hr whereas the default timeout using 'go test' is 10mins. This will start by spinning up a docker container containing the broker and then running the tests on that container. To run against an external broker, `go test -tags remote` can be used to instruct the tests to target an environment variable based broker. See the [Environment Variables](#environment-variables) section below for more information. To run an individual test, first install the ginkgo command line tool with `go install github.com/onsi/ginkgo/v2/ginkgo@latest`, then run the tests with `ginkgo --focus="mytestregex"` from the test directory. This regex will match describe/context/it strings. For more information, see the [ginkgo documentation](https://onsi.github.io/ginkgo/#the-spec-runner). diff --git a/test/data/config/config_testcontainers.json b/test/data/config/config_testcontainers.json index b02bf81..07d91d4 100644 --- a/test/data/config/config_testcontainers.json +++ b/test/data/config/config_testcontainers.json @@ -15,7 +15,7 @@ }, "testcontainers": { "broker_hostname": "solbroker", - "broker_tag": "10.3", + "broker_tag": "10.4", "broker_repo": "solace/solace-pubsub", "broker_edition": "standard", "toxiproxy_hostname": "toxiproxy", diff --git a/test/helpers/resource_helpers.go b/test/helpers/resource_helpers.go index d765661..eaf2d95 100644 --- a/test/helpers/resource_helpers.go +++ b/test/helpers/resource_helpers.go @@ -62,6 +62,28 @@ func CreateNonExclusiveQueue(queueName string, topics ...string) { } } +// CreatePartitionedQueue function +func CreatePartitionedQueue(queueName string, partitionCount int32, partitionRebalanceDelay int64, topics ...string) { + _, _, err := testcontext.SEMP().Config().QueueApi.CreateMsgVpnQueue(testcontext.SEMP() .ConfigCtx(), sempconfig.MsgVpnQueue{ + QueueName: queueName, + AccessType: "non-exclusive", + Permission: "modify-topic", + IngressEnabled: True, + EgressEnabled: True, + PartitionCount: partitionCount, + PartitionRebalanceDelay: partitionRebalanceDelay, + Owner: "default", + }, testcontext.Messaging().VPN, nil) + ExpectWithOffset(1, err).ToNot(HaveOccurred(), "Failed to create queue with name "+queueName) + for _, topic := range topics { + _, _, err = testcontext.SEMP().Config().QueueApi.CreateMsgVpnQueueSubscription(testcontext.SEMP().ConfigCtx(), + sempconfig.MsgVpnQueueSubscription{ + SubscriptionTopic: topic, + }, testcontext.Messaging().VPN, queueName, nil) + ExpectWithOffset(1, err).ToNot(HaveOccurred(), "Failed to add subscription for topic "+topic) + } +} + // CreateQueueSubscription function func CreateQueueSubscription(queueName string, topic string) { _, _, err := testcontext.SEMP().Config().QueueApi.CreateMsgVpnQueueSubscription(testcontext.SEMP().ConfigCtx(), diff --git a/test/messaging_service_test.go b/test/messaging_service_test.go index 79fa907..b17112f 100644 --- a/test/messaging_service_test.go +++ b/test/messaging_service_test.go @@ -453,23 +453,32 @@ var _ = Describe("MessagingService Lifecycle", func() { Expect(client.TlsCipherDescription).To(HavePrefix("AES128-SHA")) }) }) - // We need to explicitly enable TLS1.1 to test a few cases - Context("when allowing TLS1.1 connections", func() { + // Originially this explicitly test tls1.1 + // on systems with new openssl (3.0 or later) tls1.1 is no longer supported from the client + // As a result this is adapted to explicitly verify tls1.2 in anticipation for tls1.3 + // once openssl 1.1 support is deprecated this maybe + // We need to explicitly enable TLS1.2 to test a few cases + Context("when allowing TLS1.2 connections", func() { BeforeEach(func() { - testcontext.SEMP().Config().AllApi.UpdateBroker(testcontext.SEMP().ConfigCtx(), sempconfig.Broker{ - TlsBlockVersion11Enabled: helpers.False, - }, nil) + // semp configuration for tls version support + // revist for enabling support for tls 1.2 in the future + //testcontext.SEMP().Config().AllApi.UpdateBroker(testcontext.SEMP().ConfigCtx(), sempconfig.Broker{ + // TlsBlockVersion11Enabled: helpers.False, + //}, nil) + }) AfterEach(func() { - testcontext.SEMP().Config().AllApi.UpdateBroker(testcontext.SEMP().ConfigCtx(), sempconfig.Broker{ - TlsBlockVersion11Enabled: helpers.True, - }, nil) + // semp configuration for tls version support + // revist for disabling support for tls 1.2 in the future + //testcontext.SEMP().Config().AllApi.UpdateBroker(testcontext.SEMP().ConfigCtx(), sempconfig.Broker{ + // TlsBlockVersion11Enabled: helpers.True, + //}, nil) }) It("should be able to connect with excluded protocols", func() { builder.WithTransportSecurityStrategy(config.NewTransportSecurityStrategy(). - WithExcludedProtocols(config.TransportSecurityProtocolTLSv1_2)) + WithExcludedProtocols(config.TransportSecurityProtocolSSLv3, config.TransportSecurityProtocolTLSv1, config.TransportSecurityProtocolTLSv1_1)) helpers.TestConnectDisconnectMessagingServiceClientValidation(builder, func(client *monitor.MsgVpnClient) { - Expect(client.TlsVersion).To(BeEquivalentTo(config.TransportSecurityProtocolTLSv1_1)) + Expect(client.TlsVersion).To(BeEquivalentTo(config.TransportSecurityProtocolTLSv1_2)) }) }) }) diff --git a/test/partitioned_queue_test.go b/test/partitioned_queue_test.go new file mode 100644 index 0000000..9009560 --- /dev/null +++ b/test/partitioned_queue_test.go @@ -0,0 +1,322 @@ +// pubsubplus-go-client +// +// Copyright 2021-2022 Solace Corporation. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "time" + "fmt" + "strconv" + + "solace.dev/go/messaging" + "solace.dev/go/messaging/pkg/solace" + "solace.dev/go/messaging/pkg/solace/config" + "solace.dev/go/messaging/pkg/solace/metrics" + "solace.dev/go/messaging/pkg/solace/resource" + //"solace.dev/go/messaging/pkg/solace/subcode" + "solace.dev/go/messaging/test/helpers" + "solace.dev/go/messaging/test/testcontext" + "solace.dev/go/messaging/pkg/solace/message" + + sempconfig "solace.dev/go/messaging/test/sempclient/config" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Partitioned Queue Tests", func() { + var queueName string = "partitioned_queue_test" + var topicName string = "partitioned_queue_topic_test" + var rebalanceDelay int64 = 5 + var partitionCount int32 = 3 + Context("queue has three partitions and rebalance delay of 1 second", func() { + BeforeEach(func() { + helpers.CreatePartitionedQueue(queueName, partitionCount, rebalanceDelay, topicName) + }) + + AfterEach(func() { + helpers.DeleteQueue(queueName) + }) + + It("should have at least one key assigned to each partition and same keyed messages go to same partition", func() { + var messagingServices[4]solace.MessagingService + var partitionKeys[9]string + + //generate partition keys + for i := 0; i < 9; i++{ + partitionKeys[i] = "key_"+ strconv.Itoa(i) + } + + for i := 0; i < 4; i++{ + messagingServices[i] = helpers.BuildMessagingService(messaging.NewMessagingServiceBuilder().FromConfigurationProvider(helpers.DefaultConfiguration())) + helpers.ConnectMessagingService(messagingServices[i]) + } + + defer func() { + for i := 0; i < 4; i++{ + helpers.DisconnectMessagingService(messagingServices[i]) + } + }() + + + partitionedQueue := resource.QueueDurableNonExclusive(queueName) + publisher := helpers.NewPersistentPublisher(messagingServices[0]) + + + receiverOne, _ := messagingServices[1].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) + + receiverTwo, _ := messagingServices[2].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) + + receiverThree, _ := messagingServices[3].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) + + publisher.Start() + receiverOne.Start() + receiverTwo.Start() + receiverThree.Start() + + messageBuilder := messagingServices[0].MessageBuilder() + for i := 0; i < 18; i++{ + msg, _ := messageBuilder.WithProperty(config.MessageProperty(config.QueuePartitionKey), partitionKeys[i % 9]).BuildWithStringPayload("Hi Solace") + publisher.Publish(msg, resource.TopicOf(topicName), nil, nil) + } + + publisher.Terminate(5 * time.Second) + + messageHandler := func(message message.InboundMessage) { + fmt.Println("message received") + } + + receiverOne.ReceiveAsync(messageHandler) + receiverTwo.ReceiveAsync(messageHandler) + receiverThree.ReceiveAsync(messageHandler) + + publisherMetrics := messagingServices[0].Metrics() + receiverOneMetrics := messagingServices[1].Metrics() + receiverTwoMetrics := messagingServices[2].Metrics() + receiverThreeMetrics := messagingServices[3].Metrics() + + Eventually(func() uint64 { + return receiverOneMetrics.GetValue(metrics.PersistentMessagesReceived) + }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) + + Eventually(func() uint64 { + return receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) + }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) + + Eventually(func() uint64 { + return receiverThreeMetrics.GetValue(metrics.PersistentMessagesReceived) + }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) + + Eventually( func() uint64 { + totalMessagesReceived := receiverOneMetrics. + GetValue(metrics.PersistentMessagesReceived) + receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) + receiverThreeMetrics.GetValue(metrics.PersistentMessagesReceived) + return totalMessagesReceived + }).WithTimeout(10 * time.Second).Should(Equal(publisherMetrics.GetValue(metrics.TotalMessagesSent))) + + Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + Expect(receiverThree.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + }) + + It("generates flow inactive event when no partitions left for consumer to bind", func() { + + var listenerOne solace.ReceiverStateChangeListener + var listenerTwo solace.ReceiverStateChangeListener + var listenerThree solace.ReceiverStateChangeListener + + var messagingServices[3]solace.MessagingService + + partitionedQueue := resource.QueueDurableNonExclusive(queueName) + + for i := 0; i < 3; i++{ + messagingServices[i] = helpers.BuildMessagingService(messaging.NewMessagingServiceBuilder(). + FromConfigurationProvider(helpers.DefaultConfiguration())) + helpers.ConnectMessagingService(messagingServices[i]) + } + + defer func() { + for i := 0; i < 3; i++{ + helpers.DisconnectMessagingService(messagingServices[i]) + } + }() + //activeStateTransitions refer to start-up induced state changes + //(i.e., receiver transition from passive to active whereas passive transitions are induced on partition downscale + activeStateTransitions, passiveStateTransitions := 0, 0 + ch := make(chan struct{}) + + passiveTransitionIncrementor := func(oldState, newState solace.ReceiverState, timestamp time.Time) { + if oldState == solace.ReceiverActive && newState == solace.ReceiverPassive { + passiveStateTransitions++ + } else { + activeStateTransitions++ + } + + if passiveStateTransitions == 2 && activeStateTransitions == 3 { + close(ch) + } + } + + listenerOne, listenerTwo, listenerThree = passiveTransitionIncrementor, passiveTransitionIncrementor, passiveTransitionIncrementor + + receiverOne, _ := messagingServices[0].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).WithActivationPassivationSupport(listenerOne).Build(partitionedQueue) + + receiverTwo, _ := messagingServices[1].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).WithActivationPassivationSupport(listenerTwo).Build(partitionedQueue) + + receiverThree, _ := messagingServices[2].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).WithActivationPassivationSupport(listenerThree).Build(partitionedQueue) + + Expect(receiverOne.Start()).ToNot(HaveOccurred()) + Expect(receiverTwo.Start()).ToNot(HaveOccurred()) + Expect(receiverThree.Start()).ToNot(HaveOccurred()) + + time.Sleep(10 * time.Second) + + testcontext.SEMP().Config().QueueApi.UpdateMsgVpnQueue( + testcontext.SEMP().ConfigCtx(), + sempconfig.MsgVpnQueue{ + PartitionCount: 1, + }, + testcontext.Messaging().VPN, + queueName, + nil, + ) + + Eventually(ch).WithTimeout(10 * time.Second).Should(BeClosed()) + + Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + Expect(receiverThree.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + }) + It("rebinds to same partition after reconnect within rebalance delay", func () { + var messagingServices[3]solace.MessagingService + for i := 0; i < 3; i++{ + if i == 2 { + messagingServices[i] = helpers.BuildMessagingService(messaging.NewMessagingServiceBuilder(). + FromConfigurationProvider(helpers.ToxicConfiguration())) + } else { + messagingServices[i] = helpers.BuildMessagingService(messaging.NewMessagingServiceBuilder(). + FromConfigurationProvider(helpers.DefaultConfiguration())) + } + + helpers.ConnectMessagingService(messagingServices[i]) + } + + defer func() { + for i := 0; i < 3; i++{ + helpers.DisconnectMessagingService(messagingServices[i]) + } + }() + + var partitionKeys[9]string + for i := 0; i < 9; i++{ + partitionKeys[i] = "key_"+ strconv.Itoa(i) + } + + publisher := helpers.NewPersistentPublisher(messagingServices[0]) + messageBuilder := messagingServices[0].MessageBuilder() + publisher.Start() + + publisherMetrics := messagingServices[0].Metrics() + publishMessages := func (firstConnectionAttempt bool) { + for i := 0; i < 18; i++{ + msg, _ := messageBuilder.WithProperty(config.MessageProperty(config.QueuePartitionKey), partitionKeys[i % 9]).BuildWithStringPayload("Hi Solace") + publisher.Publish(msg, resource.TopicOf(topicName), nil, nil) + } + + if firstConnectionAttempt { + Eventually(func() uint64 { + return publisherMetrics.GetValue(metrics.TotalMessagesSent) + }).WithTimeout(30 * time.Second).Should(BeNumerically("==", 18)) + } else { + Eventually(func() uint64 { + return publisherMetrics.GetValue(metrics.TotalMessagesSent) + }).WithTimeout(30 * time.Second).Should(BeNumerically("==", 36)) + } + } + + publishMessages(true) + + partitionedQueue := resource.QueueDurableNonExclusive(queueName) + receiverOne, _ := messagingServices[1].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) + receiverOne.Start() + receiverOnePartitionKeys := make([]string, 0, 18) + receiverOneMessageHandler := func (message message.InboundMessage) { + partitionKey, _ := message.GetProperty("JMSXGroupID") + partitionKeyValue := fmt.Sprint(partitionKey) + receiverOnePartitionKeys = append(receiverOnePartitionKeys, partitionKeyValue) + } + + receiverTwo, _ := messagingServices[2].CreatePersistentMessageReceiverBuilder(). + WithSubscriptions(resource.TopicSubscriptionOf(topicName)).Build(partitionedQueue) + receiverTwo.Start() + receiverTwoMessageHandler := func (message message.InboundMessage){ + fmt.Println("Received message in receiverTwo") + } + + receiverOne.ReceiveAsync(receiverOneMessageHandler) + receiverTwo.ReceiveAsync(receiverTwoMessageHandler) + + receiverOneMetrics := messagingServices[1].Metrics() + receiverTwoMetrics := messagingServices[2].Metrics() + + Eventually( func() uint64 { + totalMessagesReceived := receiverOneMetrics.GetValue(metrics.PersistentMessagesReceived) + receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) + return totalMessagesReceived + }).WithTimeout(30 * time.Second).Should(Equal(publisherMetrics.GetValue(metrics.TotalMessagesSent))) + + partitionKeysBeforeDisconnect := make([]string, len(receiverOnePartitionKeys)) + copy(receiverOnePartitionKeys, partitionKeysBeforeDisconnect) + receiverOnePartitionKeys = receiverOnePartitionKeys[:0] + + reconnectionListenerChan := make(chan struct{}) + messagingServices[2].AddReconnectionListener(func(even solace.ServiceEvent) { + close(reconnectionListenerChan) + }) + + reconnectAttemptListenerChan := make(chan struct{}) + messagingServices[2].AddReconnectionAttemptListener(func(event solace.ServiceEvent) { + testcontext.Toxi().SMF().Enable() + close(reconnectAttemptListenerChan) + }) + + //temporarily disconnect receiverTwo + testcontext.Toxi().SMF().Disable() + + Eventually(reconnectionListenerChan).WithTimeout(30 * time.Second).Should(BeClosed()) + + //republish messages + publishMessages(false) + + Eventually( func() uint64 { + totalMessagesReceived := receiverOneMetrics.GetValue(metrics.PersistentMessagesReceived) + receiverTwoMetrics.GetValue(metrics.PersistentMessagesReceived) + return totalMessagesReceived + }).WithTimeout(30 * time.Second).Should(BeNumerically(">=", publisherMetrics.GetValue(metrics.TotalMessagesSent))) + + partitionKeysAfterReconnection := make([]string, len(receiverOnePartitionKeys)) + copy(receiverOnePartitionKeys, partitionKeysAfterReconnection) + + Expect(partitionKeysBeforeDisconnect).Should(Equal(partitionKeysAfterReconnection)) + Expect(receiverOne.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + Expect(receiverTwo.Terminate(10 * time.Second)).ToNot(HaveOccurred()) + }) + }) +}) diff --git a/test/sempclient/spec/spec_config.json b/test/sempclient/spec/spec_config.json index 4821fa8..e14b76b 100644 --- a/test/sempclient/spec/spec_config.json +++ b/test/sempclient/spec/spec_config.json @@ -6219,6 +6219,21 @@ "description": "The Client Username that owns the Queue and has permission equivalent to `\"delete\"`. Modifying this attribute while the object (or the relevant part of the object) is administratively enabled may be service impacting as egressEnabled will be temporarily set to false to apply the change. Changes to this attribute are synchronized to HA mates and replication sites via config-sync. The default value is `\"\"`.", "type": "string" }, + "partitionCount": { + "description": "The count of partitions of the queue. Only relevant for queues with an access type of non-exclusive. When zero, bound clients receive messages round-robin. Otherwise, bound clients receive messages from individually assigned partitions. Changes to this attribute are synchronized to HA mates and replication sites via config-sync. The default value is `0`. Available since (will be released in next SEMP version).", + "format": "int32", + "type": "integer" + }, + "partitionRebalanceDelay": { + "description": "The delay (in seconds) before a partition rebalance is started once needed. Changes to this attribute are synchronized to HA mates and replication sites via config-sync. The default value is `5`. Available since (will be released in next SEMP version).", + "format": "int64", + "type": "integer" + }, + "partitionRebalanceMaxHandoffTime": { + "description": "The maximum time (in seconds) to wait before handing off a partition while rebalancing. Changes to this attribute are synchronized to HA mates and replication sites via config-sync. The default value is `3`. Available since (will be released in next SEMP version).", + "format": "int64", + "type": "integer" + }, "permission": { "description": "The permission level for all consumers of the Queue, excluding the owner. Modifying this attribute while the object (or the relevant part of the object) is administratively enabled may be service impacting as egressEnabled will be temporarily set to false to apply the change. Changes to this attribute are synchronized to HA mates and replication sites via config-sync. The default value is `\"no-access\"`. The allowed values and their meaning are:\n\n
\n\"no-access\" - Disallows all access.\n\"read-only\" - Read-only access to the messages.\n\"consume\" - Consume (read and remove) messages.\n\"modify-topic\" - Consume messages or modify the topic/selector.\n\"delete\" - Consume messages, modify the topic/selector or delete the Client created endpoint altogether.\n
\n", "enum": [ @@ -6385,7 +6400,7 @@ "MsgVpnQueueTemplate": { "properties": { "accessType": { - "description": "The access type for delivering messages to consumer flows. Changes to this attribute are synchronized to HA mates and replication sites via config-sync. The default value is `\"exclusive\"`. The allowed values and their meaning are:\n\n
\n\"exclusive\" - Exclusive delivery of messages to the first bound consumer flow.\n\"non-exclusive\" - Non-exclusive delivery of messages to all bound consumer flows in a round-robin fashion.\n
\n", + "description": "The access type for delivering messages to consumer flows. Changes to this attribute are synchronized to HA mates and replication sites via config-sync. The default value is `\"exclusive\"`. The allowed values and their meaning are:\n\n
\n\"exclusive\" - Exclusive delivery of messages to the first bound consumer flow.\n\"non-exclusive\" - Non-exclusive delivery of messages to bound consumer flows in a round-robin (if partition count is zero) or partitioned (if partition count is non-zero) fashion.\n
\n", "enum": [ "exclusive", "non-exclusive" @@ -8113,7 +8128,7 @@ "MsgVpnTopicEndpointTemplate": { "properties": { "accessType": { - "description": "The access type for delivering messages to consumer flows. Changes to this attribute are synchronized to HA mates and replication sites via config-sync. The default value is `\"exclusive\"`. The allowed values and their meaning are:\n\n
\n\"exclusive\" - Exclusive delivery of messages to the first bound consumer flow.\n\"non-exclusive\" - Non-exclusive delivery of messages to all bound consumer flows in a round-robin fashion.\n
\n", + "description": "The access type for delivering messages to consumer flows. Changes to this attribute are synchronized to HA mates and replication sites via config-sync. The default value is `\"exclusive\"`. The allowed values and their meaning are:\n\n
\n\"exclusive\" - Exclusive delivery of messages to the first bound consumer flow.\n\"non-exclusive\" - Non-exclusive delivery of messages to bound consumer flows in a round-robin (if partition count is zero) or partitioned (if partition count is non-zero) fashion.\n
\n", "enum": [ "exclusive", "non-exclusive" diff --git a/test/sempclient/spec/spec_monitor.json b/test/sempclient/spec/spec_monitor.json index 8783282..89789b7 100644 --- a/test/sempclient/spec/spec_monitor.json +++ b/test/sempclient/spec/spec_monitor.json @@ -12321,7 +12321,7 @@ "MsgVpnQueue": { "properties": { "accessType": { - "description": "The access type for delivering messages to consumer flows bound to the Queue. The allowed values and their meaning are:\n\n
\n\"exclusive\" - Exclusive delivery of messages to the first bound consumer flow.\n\"non-exclusive\" - Non-exclusive delivery of messages to all bound consumer flows in a round-robin fashion.\n
\n", + "description": "The access type for delivering messages to consumer flows bound to the Queue. The allowed values and their meaning are:\n\n
\n\"exclusive\" - Exclusive delivery of messages to the first bound consumer flow.\n\"non-exclusive\" - Non-exclusive delivery of messages to bound consumer flows in a round-robin (if partition count is zero) or partitioned (if partition count is non-zero) fashion.\n
\n", "enum": [ "exclusive", "non-exclusive" @@ -12624,6 +12624,51 @@ "description": "The Client Username that owns the Queue and has permission equivalent to `\"delete\"`.", "type": "string" }, + "partitionClientName": { + "description": "The name of the client assigned to the partition if the client is currently bound to the queue. Only relevant for queues implementing an individual partition. Available since (will be released in next SEMP version).", + "type": "string" + }, + "partitionCount": { + "description": "The count of partitions of the queue. Only relevant for queues with an access type of non-exclusive. When zero, bound clients receive messages round-robin. Otherwise, bound clients receive messages from individually assigned partitions. Available since (will be released in next SEMP version).", + "format": "int32", + "type": "integer" + }, + "partitionNumber": { + "description": "The partition number. Only relevant for queues implementing an individual partition. Available since (will be released in next SEMP version).", + "format": "int32", + "type": "integer" + }, + "partitionOperationalCount": { + "description": "The operational count of partitions of the queue. Only relevant for queues with an access type of non-exclusive. This may not match the configured count while scaling is underway. Available since (will be released in next SEMP version).", + "format": "int32", + "type": "integer" + }, + "partitionQueueName": { + "description": "The name of our partitioned queue. Only relevant for queues implementing an individual partition. Available since (will be released in next SEMP version).", + "type": "string" + }, + "partitionRebalanceDelay": { + "description": "The delay (in seconds) before a partition rebalance is started once needed. Available since (will be released in next SEMP version).", + "format": "int64", + "type": "integer" + }, + "partitionRebalanceMaxHandoffTime": { + "description": "The maximum time (in seconds) to wait before handing off a partition while rebalancing. Available since (will be released in next SEMP version).", + "format": "int64", + "type": "integer" + }, + "partitionRebalanceStatus": { + "description": "The rebalance status of the partitioned queue. Only relevant for queues with an access type of non-exclusive and at least 1 partition. The allowed values and their meaning are:\n\n
\n\"ready\" - Rebalancing is complete.\n\"holddown\" - Rebalancing will start after delay.\n\"rebalancing\" - Rebalancing is underway.\n
\n Available since (will be released in next SEMP version).", + "type": "string" + }, + "partitionScaleStatus": { + "description": "The scale status of the partitioned queue. Only relevant for queues with an access type of non-exclusive and at least 1 partition. The allowed values and their meaning are:\n\n
\n\"invalid-exclusive\" - Exclusive queues have no partitions.\n\"ready\" - Partition scaling is complete.\n\"scaling-up\" - Partitions are being added.\n\"scaling-down\" - Partitions are being removed.\n\"max-partitioned-queues-exceeded\" - Maximum number of partitioned queues has been exceeded.\n\"max-partitions-exceeded\" - Maximum number of partitions has been exceeded for this partitioned queue.\n
\n Available since (will be released in next SEMP version).", + "type": "string" + }, + "partitionStatus": { + "description": "The status of the partition of the partitioned queue. Only relevant for queues implementing an individual partition. The allowed values and their meaning are:\n\n
\n\"unassigned\" - Partition is not assigned to a client.\n\"ready\" - Partition is assigned to a client.\n\"paused\" - Partition is being handed off to another client.\n\"unbound\" - Assigned client is not bound.\n
\n Available since (will be released in next SEMP version).", + "type": "string" + }, "permission": { "description": "The permission level for all consumers of the Queue, excluding the owner. The allowed values and their meaning are:\n\n
\n\"no-access\" - Disallows all access.\n\"read-only\" - Read-only access to the messages.\n\"consume\" - Consume (read and remove) messages.\n\"modify-topic\" - Consume messages or modify the topic/selector.\n\"delete\" - Consume messages, modify the topic/selector or delete the Client created endpoint altogether.\n
\n", "enum": [ @@ -12789,6 +12834,11 @@ "virtualRouter": { "description": "The virtual router of the Queue. The allowed values and their meaning are:\n\n
\n\"primary\" - The endpoint belongs to the primary virtual router.\n\"backup\" - The endpoint belongs to the backup virtual router.\n
\n Deprecated since 2.31. This attribute has been deprecated. When Guaranteed Messaging is active, this value is always the virtual router for which Guaranteed Messaging is enabled. Otherwise, this value should be ignored.", "type": "string" + }, + "xaTransactionNotSupportedDiscardedMsgCount": { + "description": "The number of guaranteed messages discarded by the Queue due to XA Transactions not being supported. Available since (will be released in next SEMP version).", + "format": "int64", + "type": "integer" } }, "type": "object" @@ -12910,6 +12960,15 @@ "description": "The name of the Message VPN.", "type": "string" }, + "partitionKey": { + "description": "The partition key of the Message. Available since (will be released in next SEMP version).", + "type": "string" + }, + "partitionKeyHash": { + "description": "The partition key hash of the Message. Available since (will be released in next SEMP version).", + "format": "int32", + "type": "integer" + }, "priority": { "description": "The priority level of the Message, from 9 (highest) to 0 (lowest).", "format": "int32", @@ -13212,7 +13271,7 @@ "MsgVpnQueueTemplate": { "properties": { "accessType": { - "description": "The access type for delivering messages to consumer flows. The allowed values and their meaning are:\n\n
\n\"exclusive\" - Exclusive delivery of messages to the first bound consumer flow.\n\"non-exclusive\" - Non-exclusive delivery of messages to all bound consumer flows in a round-robin fashion.\n
\n", + "description": "The access type for delivering messages to consumer flows. The allowed values and their meaning are:\n\n
\n\"exclusive\" - Exclusive delivery of messages to the first bound consumer flow.\n\"non-exclusive\" - Non-exclusive delivery of messages to bound consumer flows in a round-robin (if partition count is zero) or partitioned (if partition count is non-zero) fashion.\n
\n", "enum": [ "exclusive", "non-exclusive" @@ -15668,7 +15727,7 @@ "MsgVpnTopicEndpoint": { "properties": { "accessType": { - "description": "The access type for delivering messages to consumer flows bound to the Topic Endpoint. The allowed values and their meaning are:\n\n
\n\"exclusive\" - Exclusive delivery of messages to the first bound consumer flow.\n\"non-exclusive\" - Non-exclusive delivery of messages to all bound consumer flows in a round-robin fashion.\n
\n", + "description": "The access type for delivering messages to consumer flows bound to the Topic Endpoint. The allowed values and their meaning are:\n\n
\n\"exclusive\" - Exclusive delivery of messages to the first bound consumer flow.\n\"non-exclusive\" - Non-exclusive delivery of messages to bound consumer flows in a round-robin (if partition count is zero) or partitioned (if partition count is non-zero) fashion.\n
\n", "enum": [ "exclusive", "non-exclusive" @@ -16492,7 +16551,7 @@ "MsgVpnTopicEndpointTemplate": { "properties": { "accessType": { - "description": "The access type for delivering messages to consumer flows. The allowed values and their meaning are:\n\n
\n\"exclusive\" - Exclusive delivery of messages to the first bound consumer flow.\n\"non-exclusive\" - Non-exclusive delivery of messages to all bound consumer flows in a round-robin fashion.\n
\n", + "description": "The access type for delivering messages to consumer flows. The allowed values and their meaning are:\n\n
\n\"exclusive\" - Exclusive delivery of messages to the first bound consumer flow.\n\"non-exclusive\" - Non-exclusive delivery of messages to bound consumer flows in a round-robin (if partition count is zero) or partitioned (if partition count is non-zero) fashion.\n
\n", "enum": [ "exclusive", "non-exclusive" diff --git a/version.go b/version.go index febe3cc..57efa30 100644 --- a/version.go +++ b/version.go @@ -23,4 +23,4 @@ func init() { core.SetVersion(version) } -const version = "1.3.0" +const version = "1.4.0"