An implementation of Spring’s Cloud Stream Binder for integrating with Solace PubSub+ message brokers. The Spring Cloud Stream Binder project provides a higher-level abstraction towards messaging that standardizes the development of distributed message-based systems.
- Overview
- Spring Cloud Stream Binder
- Using it in your Application
- Configuration Options
- Generated Queue Name Syntax
- Consumer Concurrency
- Manual Message Acknowledgment
- Message Target Destination
- Failed Consumer Message Error Handling
- Failed Producer Message Error Handling
- Publisher Confirmations
- Resources
The Solace implementation of the Spring Cloud Stream Binder maps the following concepts from Spring to Solace:
-
Destinations to topic subscriptions (Source apps always send messages to a topic)
-
Consumer groups to durable queues
-
Anonymous consumer groups to temporary queues (When no group is specified; used for SCS Publish-Subscribe Model)
And internally, each consumer group queue is subscribed to at least their destination topic. So a typical message flow would then appear as follows:
-
Producer bindings publish messages to their destination topics
-
Consumer group queues receive the messages published to their destination topic
-
Consumers of a particular consumer group consume messages from their group in a round-robin fashion (by default)
Note that partitioning is not yet supported by this version of the binder.
Note that since the Binder always consumes from queues it is currently required that Assured Delivery be enabled on the Solace PubSub+ Message VPN being used (Assured Delivery is automatically enabled if using Solace Cloud.)
Also, it will be assumed that you have a basic understanding of the Spring Cloud Stream project. If not, then please refer to Spring’s documentation. For the sake of brevity, this document will solely focus on discussing components unique to Solace.
This project extends the Spring Cloud Stream Binder project. If you are new to Spring Cloud Stream, check out their documentation.
The following is a brief excerpt from that document:
Spring Cloud Stream is a framework for building message-driven microservice applications. Spring Cloud Stream builds upon Spring Boot to create standalone, production-grade Spring applications and uses Spring Integration to provide connectivity to message brokers. It provides opinionated configuration of middleware from several vendors, introducing the concepts of persistent publish-subscribe semantics, consumer groups, and partitions.
The releases from this project are hosted in Maven Central.
The easiest way to get started is to include the spring-cloud-starter-stream-solace
in your application.
Here is how to include the spring cloud stream starter in your project using Gradle and Maven.
// Solace Spring Cloud Stream Binder
compile("com.solace.spring.cloud:spring-cloud-starter-stream-solace:3.1.0")
Starting in Spring Cloud Stream version 3 the recommended way to define binding and binding names is to use the Functional approach, which uses Spring Cloud Functions. You can learn more in the Spring Cloud Function support and Functional Binding Names sections of the reference guide.
Given this example app:
@SpringBootApplication
public class SampleAppApplication {
public static void main(String[] args) {
SpringApplication.run(SampleAppApplication.class, args);
}
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
}
An applicable Solace configuration file may look like:
spring:
cloud:
function:
definition: uppercase
stream:
bindings:
uppercase-in-0:
destination: queuename
group: myconsumergroup
binder: solace-broker
uppercase-out-0:
destination: uppercase/topic
binder: solace-broker
binders:
solace-broker:
type: solace
environment:
solace:
java:
host: tcp://localhost:55555
msgVpn: default
clientUsername: default
clientPassword: default
connectRetries: -1
reconnectRetries: -1
Notice that the latter half of this configuration actually originates from the JCSMP Spring Boot Auto-Configuration project.
Configuration of the Solace Spring Cloud Stream Binder is done through Spring Boot’s externalized configuration. This is where users can control the binder’s configuration options as well as the Solace Java API properties.
As for auto-configuration-related options required for auto-connecting to Solace message brokers, refer to the JCSMP Spring Boot Auto-Configuration documentation.
For general binder configuration options and properties, refer to the Spring Cloud Stream Reference Documentation.
The following properties are available for Solace consumers only and must be prefixed with spring.cloud.stream.solace.bindings.<bindingName>.consumer.
where bindingName
looks something like functionName-in-0
as defined in Functional Binding Names.
See SolaceCommonProperties and SolaceConsumerProperties for the most updated list.
- provisionDurableQueue
-
Whether to provision durable queues for non-anonymous consumer groups. This should only be set to
false
if you have externally pre-provisioned the required queue on the message broker.Default:
true
See: Generated Queue Name Syntax - provisionSubscriptionsToDurableQueue
-
Whether to add topic subscriptions to durable queues for non-anonymous consumer groups. This should only be set to
false
if you have externally pre-added the required topic subscriptions (the destination topic should be added at minimum) on the consumer group’s queue on the message broker. This property also applies to topics added by thequeueAdditionalSubscriptions
property.Default:
true
- queueNamePrefix
-
Naming prefix for all queues.
Default:
"scst"
See: Generated Queue Name Syntax - useFamiliarityInQueueName
-
When set to
true
, the familiarity modifier,wk
/an
, is included in the generated queue name.Default:
true
See: Generated Queue Name Syntax - useDestinationEncodingInQueue
-
When set to
true
, the destination encoding (plain
), is included in the generated queue name.Default:
true
See: Generated Queue Name Syntax - useGroupNameInQueueName
-
Whether to include the
group
name in the queue name for non-anonymous consumer groups.Default:
true
See: Generated Queue Name Syntax❗If set to false
, all consumers of the samedestination
which also have this set tofalse
will consume from the same queue regardless of their configuredgroup
names. - queueAccessType
-
Access type for the consumer group queue.
Default:
EndpointProperties.ACCESSTYPE_NONEXCLUSIVE
- queuePermission
-
Permissions for the consumer group queue.
Default:
EndpointProperties.PERMISSION_CONSUME
- queueDiscardBehaviour
-
If specified, whether to notify sender if a message fails to be enqueued to the consumer group queue.
Default:
null
- queueMaxMsgRedelivery
-
Sets the maximum message redelivery count on consumer group queue. (Zero means retry forever).
Default:
null
- queueMaxMsgSize
-
Maximum message size for the consumer group queue.
Default:
null
- queueQuota
-
Message spool quota for the consumer group queue.
Default:
null
- queueRespectsMsgTtl
-
Whether the consumer group queue respects Message TTL.
Default:
null
- queueAdditionalSubscriptions
-
An array of additional topic subscriptions to be applied on the consumer group queue.
These subscriptions may also contain wildcards.
Theprefix
property is not applied on these subscriptions.Default:
String[0]
- polledConsumerWaitTimeInMillis
-
Maximum wait time for polled consumers to receive a message from their consumer group queue.
Default:
100
- flowPreRebindWaitTimeout
-
The maximum time to wait for all unacknowledged messages to be acknowledged before a flow receiver rebind. Will wait forever if set to a value less than
0
.Default:
10000
- autoBindErrorQueue
-
Whether to automatically create a durable error queue to which messages will be republished when message processing failures are encountered. Only applies once all internal retries have been exhausted.
Default:
false
💡Your ACL Profile must allow for publishing to this queue if you decide to use autoBindErrorQueue
. - provisionErrorQueue
-
Whether to provision durable queues for error queues when
autoBindErrorQueue
istrue
. This should only be set tofalse
if you have externally pre-provisioned the required queue on the message broker.Default:
true
See: Generated Error Queue Name Syntax - errorQueueNameOverride
-
A custom error queue name.
Default:
null
See: Generated Error Queue Name Syntax - useGroupNameInErrorQueueName
-
Whether to include the
group
name in the error queue name for non-anonymous consumer groups.Default:
true
See: Generated Error Queue Name Syntax❗If set to false
, all consumers of the samedestination
which also have this set tofalse
will republish failed messages to the same error queue regardless of their configuredgroup
names. - errorQueueMaxDeliveryAttempts
-
Maximum number of attempts to send a failed message to the error queue. When all delivery attempts have been exhausted, the failed message will be requeued.
Default:
3
- errorQueueAccessType
-
Access type for the error queue.
Default:
EndpointProperties.ACCESSTYPE_NONEXCLUSIVE
- errorQueuePermission
-
Permissions for the error queue.
Default:
EndpointProperties.PERMISSION_CONSUME
- errorQueueDiscardBehaviour
-
If specified, whether to notify sender if a message fails to be enqueued to the error queue.
Default:
null
- errorQueueMaxMsgRedelivery
-
Sets the maximum message redelivery count on the error queue. (Zero means retry forever).
Default:
null
- errorQueueMaxMsgSize
-
Maximum message size for the error queue.
Default:
null
- errorQueueQuota
-
Message spool quota for the error queue.
Default:
null
- errorQueueRespectsMsgTtl
-
Whether the error queue respects Message TTL.
Default:
null
- errorMsgDmqEligible
-
The eligibility for republished messages to be moved to a Dead Message Queue.
Default:
null
- errorMsgTtl
-
The number of milliseconds before republished messages are discarded or moved to a Dead Message Queue.
Default:
null
The following properties are available for Solace producers only and must be prefixed with spring.cloud.stream.solace.bindings.<bindingName>.producer.
where bindingName
looks something like functionName-out-0
as defined in Functional Binding Names.
See SolaceCommonProperties and SolaceProducerProperties for the most updated list.
- headerExclusions
-
The list of headers to exclude from the published message. Excluding Solace message headers is not supported.
Default: Empty
List<String>
- nonserializableHeaderConvertToString
-
When set to
true
, irreversibly convert non-serializable headers to strings. An exception is thrown otherwise.Default:
false
❗Non-serializable headers should have a meaningful toString()
implementation. Otherwise enabling this feature may result in potential data loss. - provisionDurableQueue
-
Whether to provision durable queues for non-anonymous consumer groups. This should only be set to
false
if you have externally pre-provisioned the required queue on the message broker.Default:
true
See: Generated Queue Name Syntax - provisionSubscriptionsToDurableQueue
-
Whether to add topic subscriptions to durable queues for non-anonymous consumer groups. This should only be set to
false
if you have externally pre-added the required topic subscriptions (the destination topic should be added at minimum) on the consumer group’s queue on the message broker. This property also applies to topics added by thequeueAdditionalSubscriptions
property.Default:
true
- queueNamePrefix
-
Naming prefix for all queues.
Default:
"scst"
See: Generated Queue Name Syntax - useFamiliarityInQueueName
-
When set to
true
, the familiarity modifier,wk
/an
, is included in the generated queue name.Default:
true
See: Generated Queue Name Syntax - useDestinationEncodingInQueue
-
When set to
true
, the destination encoding (plain
), is included in the generated queue name.Default:
true
See: Generated Queue Name Syntax - queueAccessType
-
Access type for the required consumer group queue.
Default:
EndpointProperties.ACCESSTYPE_NONEXCLUSIVE
- queuePermission
-
Permissions for the required consumer group queue.
Default:
EndpointProperties.PERMISSION_CONSUME
- queueDiscardBehaviour
-
If specified, whether to notify sender if a message fails to be enqueued to the required consumer group queue.
Default:
null
- queueMaxMsgRedelivery
-
Sets the maximum message redelivery count on the required consumer group queue. (Zero means retry forever).
Default:
null
- queueMaxMsgSize
-
Maximum message size for the required consumer group queue.
Default:
null
- queueQuota
-
Message spool quota for the required consumer group queue.
Default:
null
- queueRespectsMsgTtl
-
Whether the required consumer group queue respects Message TTL.
Default:
null
- queueAdditionalSubscriptions
-
A mapping of required consumer groups to arrays of additional topic subscriptions to be applied on each consumer group’s queue.
These subscriptions may also contain wildcards.
Theprefix
property is not applied on these subscriptions.Default: Empty
Map<String,String[]>
Solace-defined Spring headers to get/set Solace metadata from/to Spring Message
headers.
|
solace_ is a header space reserved for Solace-defined headers. Creating new solace_ -prefixed headers is not supported. Doing so may cause unexpected side-effects in future versions of this binder.
|
🔥
|
Refer to each header’s documentation for their expected usage scenario. Using headers outside of their intended type and access-control is not supported. |
ℹ️
|
Header inheritance applies to Solace message headers in processor message handlers:
|
These headers are to get/set Solace message properties.
💡
|
Use SolaceHeaders instead of hardcoding the header names. This class also contains the same documentation that you see here. |
Header Name | Type | Access | Description |
---|---|---|---|
|
|
Read/Write |
The message ID (a string for an application-specific message identifier). This is the |
|
|
Read/Write |
The application message type. This is the |
|
|
Read/Write |
The correlation ID. |
|
|
Read |
The destination this message was published to. |
|
|
Read |
Whether one or more messages have been discarded prior to the current message. |
|
|
Read/Write |
Whether the message is eligible to be moved to a Dead Message Queue. |
|
|
Read/Write |
The UTC time (in milliseconds, from midnight, January 1, 1970 UTC) when the message is supposed to expire. |
|
|
Read/Write |
The HTTP content encoding header value from interaction with an HTTP client. |
|
|
Read/Write |
Priority value in the range of 0–255, or -1 if it is not set. |
|
|
Read |
The receive timestamp (in milliseconds, from midnight, January 1, 1970 UTC). |
|
|
Read |
Indicates if the message has been delivered by the broker to the API before. |
|
|
Read/Write |
The replyTo destination for the message. |
|
|
Read/Write |
The Sender ID for the message. |
|
|
Read/Write |
The send timestamp (in milliseconds, from midnight, January 1, 1970 UTC). |
|
|
Read/Write |
The sequence number. |
|
|
Read/Write |
The number of milliseconds before the message is discarded or moved to a Dead Message Queue. |
|
|
Read/Write |
When an application sends a message, it can optionally attach application-specific data along with the message, such as user data. |
These headers are to get/set Solace Spring Cloud Stream Binder properties.
These can be used for:
-
Getting/Setting Solace Binder metadata
-
Directive actions for the binder when producing/consuming messages
💡
|
Use SolaceBinderHeaders instead of hardcoding the header names. This class also contains the same documentation that you see here. |
Header Name | Type | Access | Default Value | Description |
---|---|---|---|---|
|
|
Write |
A CorrelationData instance for messaging confirmations |
|
|
|
Read |
|
A static number set by the publisher to indicate the Spring Cloud Stream Solace message version. |
|
|
Internal Binder Use Only |
Is |
|
|
|
Internal Binder Use Only |
A JSON String array of header names where each entry indicates that that header’s value was serialized by a Solace Spring Cloud Stream binder before publishing it to a broker. |
|
|
|
Internal Binder Use Only |
|
The encoding algorithm used to encode the headers indicated by |
By default, generated consumer group queue names have the following form:
<prefix>/<familiarity-modifier>/<group>/<destination-encoding>/<encoded-destination>
- prefix
-
A static prefix as indicated by the
queueNamePrefix
configuration option. - familiarity-modifier
-
Indicates the durability of the consumer group (
wk
for well-known oran
for anonymous). Can be enabled/disabled with theuseFamiliarityInQueueName
config option. - group
-
The consumer
group
name. Can be enabled/disabled for consumers with theuseGroupNameInQueueName
consumer config option. - destination-encoding
-
Indicates the encoding scheme used to encode the destination in the queue name (currently only
plain
is supported). Can be enabled/disabled with theuseDestinationEncodingInQueueName
config option. - encoded-destination
-
The encoded
destination
as per<destination-encoding>
.
By default, generated error queue names have the following form:
<prefix>/error/<familiarity-modifier>/<group>/<destination-encoding>/<encoded-destination>
The definitions of each segment of the error queue matches that from Generated Queue Name Syntax, with the following exceptions:
- group
-
The consumer
group
name. Can be enabled/disabled with theuseGroupNameInErrorQueueName
consumer config option.
The error queue name can also be manually overridden with the errorQueueNameOverride
consumer config option.
Configure Spring Cloud Stream’s concurrency consumer property to enable concurrent message consumption for a particular consumer binding.
Though note that there are few limitations:
-
concurrency
> 1 is not supported for exclusive queues. -
concurrency
> 1 is not supported for consumer bindings which are a part of anonymous consumer groups. -
concurrency
> 1 is ignored for polled consumers. -
Setting
provisionDurableQueue
tofalse
disables endpoint configuration validation. Meaning that point 1 cannot be validated. In this scenario, it is the developer’s responsibility to ensure that point 1 is followed.
Message handlers can disable auto-acknowledgement and manually invoke the acknowledgement callback as follows:
public void consume(Message<?> message) {
AcknowledgmentCallback acknowledgmentCallback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(message); // (1)
acknowledgmentCallback.noAutoAck(); // (2)
try {
AckUtils.accept(acknowledgmentCallback); // (3)
} catch (SolaceAcknowledgmentException e) {} // (4)
}
-
Get the message’s acknowledgement callback header
-
Disable auto-acknowledgement
-
Acknowledge the message with the
ACCEPT
status -
Handle any acknowledgment exceptions (mostly
SolaceStaleMessageException
)
Refer to the AckUtils documentation and AcknowledgmentCallback documentation for more info on these objects.
💡
|
If manual acknowledgement is to be done outside of the message handler’s thread, then make sure auto-acknowledgement is disabled within the message handler’s thread and not an external one. Otherwise, the binder will auto-acknowledge the message when the message handler returns. |
For each acknowledgement status, the binder will perform the following actions:
Status | Action |
---|---|
ACCEPT |
Acknowledge the message. |
REJECT |
If Refer to Failed Consumer Message Error Handling for more info. |
REQUEUE |
If the consumer is in a defined consumer group, rebind the consumer flow. Otherwise, a Refer to Message Redelivery for more info. |
❗
|
Acknowledgements may throw Example: A |
ℹ️
|
Manual acknowledgements do not support any application-internal error handling strategies (i.e. retry template, error channel forwarding, etc). Also, throwing an exception in the message handler will always acknowledge the message in some way regardless if auto-acknowledgment is disabled. |
💡
|
If asynchronously acknowledging messages, then if these messages aren’t acknowledged in a timely manner, it is likely for the message consumption rate to stall due to the consumer queue’s configured "Maximum Delivered Unacknowledged Messages per Flow". This property can be configured for dynamically created queues by using queue templates. However note that as per our documentation, anonymous consumer group queues (i.e. temporary queues) will not match a queue template’s name filter. Only the queue template defined in the client profile’s "Copy Settings From Queue Template" setting will apply to those. |
Spring Cloud Stream has a reserved message header called scst_targetDestination
(retrievable via BinderHeaders.TARGET_DESTINATION
), which allows for messages to be redirected from their bindings' configured destination to the target destination specified by this header.
For this binder’s implementation of this header, the target destination defines the exact Solace topic to which a message will be sent. i.e. No post-processing is done for this header (e.g. prefix
is not applied).
If you want to apply a destination post-processing step – lets say the prefix
for example, you will need to directly apply that to the header itself:
public class MyMessageBuilder {
@Value("${spring.cloud.stream.solace.bindings.<bindingName>.producer.prefix}") // (1)
String prefix;
public Message<String> buildMeAMessage() {
return MessageBuilder.withPayload("payload")
.setHeader(BinderHeaders.TARGET_DESTINATION, prefix + "new-target-destination") // (2)
.build();
}
}
-
Retrieve your binding’s configured prefix.
-
Apply the prefix to the target destination header.
Also, this header is cleared by the message’s producer before it is sent off to the message broker. So you should attach the target destination to your message payload if you want to get that information on the consumer-side.
The Spring cloud stream framework already provides a number of application-internal reprocessing strategies for failed messages during message consumption such as. You can read more about that here:
However, after all internal error handling strategies have been exhausted, the Solace implementation of the binder would either:
-
Redeliver the failed message (default)
-
Republish the message to another queue (an error queue) for an external application/binding to process
A simple error handling strategy in which failed messages are redelivered from the consumer group’s queue. This is very similar to simply enabling the retry template (setting maxAttempts
to a value greater than 1
), but allows for the failed messages to be re-processed by the message broker.
❗
|
The Solace API used in this binder implementation does not support individual message redelivery. Here is what happens under the hood when this is triggered:
Meaning that if unacknowledged messages are not processed in a timely manner, this operation will stall and potentially cause unecessary message duplication. |
First, it must be noted that an Error Queue is different from a Dead Message Queue (DMQ). In particular, a DMQ is used to capture re-routed failed messages as a consequence of Solace PubSub+ messaging features such as TTL expiration or exceeding a message’s max redelivery count. Whereas the purpose of an Error Queue is to capture re-routed messages which have been successfully consumed from the message broker, yet cannot be processed by the application.
An Error Queue can be provisioned for a particular consumer group by setting the autoBindErrorQueue
consumer config option to true
. This Error Queue is simply another durable queue which is named as per the Generated Error Queue Name Syntax section. And like the queues used for consumer groups, its endpoint properties can be configured by means of any consumer properties whose names begin with "errorQueue".
ℹ️
|
Error Queues should not be used with anonymous consumer groups. Since the names of anonymous consumer groups, and in turn the name of their would-be Error Queues, are randomly generated at runtime, it would provide little value to create bindings to these Error Queues because of their unpredictable naming and temporary existence. Also, your environment will be polluted with orphaned Error Queues whenever these consumers rebind. |
By default, asynchronous producer errors aren’t handled by the framework. Producer error channels can be enabled using the errorChannelEnabled
producer config option.
Beyond that, this binder also supports using a Future
to wait for publish confirmations. See [Publisher Confirms] for more info.
For each message you can create a new CorrelationData
instance and set it as the value of your message’s SolaceBinderHeaders.CONFIRM_CORRELATION
header.
ℹ️
|
CorrelationData can be extended to add more correlation info. The SolaceBinderHeaders.CONFIRM_CORRELATION header is not reflected in the actual message published to the broker.
|
Now using CorrelationData.getFuture().get()
, you can wait for a publish acknowledgment from the broker. If the publish failed, then this future will throw an exception.
For example:
@Autowired
private StreamBridge streamBridge;
public void send(String payload, long timeout, TimeUnit unit) {
CorrelationData correlationData = new CorrelationData();
Message<SensorReading> message = MessageBuilder.withPayload(payload)
.setHeader(SolaceBinderHeaders.CONFIRM_CORRELATION, correlationData)
.build();
streamBridge.send("output-destination", message);
try {
correlationData.getFuture().get(timeout, unit);
// Do success logic
} catch (InterruptedException | ExecutionException | TimeoutException e) {
// Do failure logic
}
}
For more information about Spring Cloud Streams try these resources:
For more information about Solace technology in general please visit these resources:
-
The Solace Developer Portal website at: https://solace.dev
-
Ask the Solace community