Skip to content

Commit

Permalink
Release 3.2.0 (#279) (#280)
Browse files Browse the repository at this point in the history
* DATAGO-67277: Feature to exclude header on consumer binding

Co-authored-by: mayur-solace <[email protected]>
  • Loading branch information
Nephery and mayur-solace authored Mar 25, 2024
1 parent 5af78f6 commit b30fa68
Show file tree
Hide file tree
Showing 16 changed files with 306 additions and 73 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-build</artifactId>
<version>3.1.1-SNAPSHOT</version>
<version>3.2.0-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Solace Spring Cloud Build</name>
Expand All @@ -32,7 +32,7 @@
<!-- Remove this if the next version of solace-spring-boot works fine -->
<spring.boot.version>3.1.5</spring.boot.version>

<solace.spring.cloud.stream-starter.version>4.1.1-SNAPSHOT</solace.spring.cloud.stream-starter.version>
<solace.spring.cloud.stream-starter.version>4.2.0-SNAPSHOT</solace.spring.cloud.stream-starter.version>

<solace.integration.test.support.version>1.0.2</solace.integration.test.support.version>
<solace.integration.test.support.fetch_checkout.skip>false</solace.integration.test.support.fetch_checkout.skip>
Expand Down
7 changes: 4 additions & 3 deletions solace-spring-cloud-bom/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Consult the table below to determine which version of the BOM you need to use:
| 2021.0.6 | 2.5.0 | 2.7.x |
| 2022.0.2 | 3.0.0 | 3.0.x |
| 2022.0.4 | 3.1.0 | 3.1.x |
| 2022.0.4 | 3.2.0 | 3.1.x |

## Including the BOM

Expand All @@ -38,7 +39,7 @@ In addition to showing how to include the BOM, the following snippets also shows
<dependency>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-bom</artifactId>
<version>3.1.0</version>
<version>3.2.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down Expand Up @@ -66,7 +67,7 @@ apply plugin: 'io.spring.dependency-management'
dependencyManagement {
imports {
mavenBom "com.solace.spring.cloud:solace-spring-cloud-bom:3.1.0"
mavenBom "com.solace.spring.cloud:solace-spring-cloud-bom:3.2.0"
}
}
Expand All @@ -78,7 +79,7 @@ dependencies {
### Using it with Gradle 5
```groovy
dependencies {
implementation(platform("com.solace.spring.cloud:solace-spring-cloud-bom:3.1.0"))
implementation(platform("com.solace.spring.cloud:solace-spring-cloud-bom:3.2.0"))
implementation("com.solace.spring.cloud:spring-cloud-starter-stream-solace")
}
```
4 changes: 2 additions & 2 deletions solace-spring-cloud-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
<parent>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-build</artifactId>
<version>3.1.1-SNAPSHOT</version>
<version>3.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>solace-spring-cloud-bom</artifactId>
<packaging>pom</packaging>
<version>3.1.1-SNAPSHOT</version>
<version>3.2.0-SNAPSHOT</version>

<name>Solace Spring Cloud BOM</name>
<description>BOM for Solace Spring Cloud</description>
Expand Down
2 changes: 1 addition & 1 deletion solace-spring-cloud-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-build</artifactId>
<version>3.1.1-SNAPSHOT</version>
<version>3.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
= Spring Cloud Stream Binder for Solace PubSub+
:revnumber: 4.1.0
:revnumber: 4.2.0
:toc: preamble
:toclevels: 3
:icons: font
Expand Down Expand Up @@ -346,6 +346,11 @@ The number of milliseconds before republished messages are discarded or moved to
+
Default: `null`

headerExclusions::
The list of headers to exclude when converting consumed Solace message to Spring message.
+
Default: Empty `List&lt;String&gt;`

==== Solace Producer Properties

The following properties are available for Solace producers only and must be prefixed with `spring.cloud.stream.solace.bindings.&lt;bindingName&gt;.producer.` where `bindingName` looks something like `functionName-out-0` as defined in https://docs.spring.io/spring-cloud-stream/docs/{scst-version}/reference/html/spring-cloud-stream.html#_functional_binding_names[Functional Binding Names].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
<parent>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-parent</artifactId>
<version>3.1.1-SNAPSHOT</version>
<version>3.2.0-SNAPSHOT</version>
<relativePath>../../solace-spring-cloud-parent/pom.xml</relativePath>
</parent>

<artifactId>spring-cloud-starter-stream-solace</artifactId>
<version>4.1.1-SNAPSHOT</version>
<version>4.2.0-SNAPSHOT</version>
<packaging>jar</packaging>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
<parent>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-parent</artifactId>
<version>3.1.1-SNAPSHOT</version>
<version>3.2.0-SNAPSHOT</version>
<relativePath>../../solace-spring-cloud-parent/pom.xml</relativePath>
</parent>

<artifactId>spring-cloud-stream-binder-solace-core</artifactId>
<version>4.1.1-SNAPSHOT</version>
<version>4.2.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>Solace Spring Cloud Stream Binder Core</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,13 @@ private void processBatchIfAvailable() {

Message<?> createOneMessage(BytesXMLMessage bytesXMLMessage, AcknowledgmentCallback acknowledgmentCallback) {
setAttributesIfNecessary(bytesXMLMessage, acknowledgmentCallback);
return xmlMessageMapper.map(bytesXMLMessage, acknowledgmentCallback);
return xmlMessageMapper.map(bytesXMLMessage, acknowledgmentCallback, consumerProperties.getExtension());
}

Message<?> createBatchMessage(List<BytesXMLMessage> bytesXMLMessages,
AcknowledgmentCallback acknowledgmentCallback) {
setAttributesIfNecessary(bytesXMLMessages, acknowledgmentCallback);
return xmlMessageMapper.mapBatchMessage(bytesXMLMessages, acknowledgmentCallback);
return xmlMessageMapper.mapBatchMessage(bytesXMLMessages, acknowledgmentCallback, consumerProperties.getExtension());
}

void sendOneToConsumer(final Message<?> message, final BytesXMLMessage bytesXMLMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ protected Object doReceive() {
private Message<?> processMessage(MessageContainer messageContainer) {
AcknowledgmentCallback acknowledgmentCallback = ackCallbackFactory.createCallback(messageContainer);
try {
return xmlMessageMapper.map(messageContainer.getMessage(), acknowledgmentCallback, true);
return xmlMessageMapper.map(messageContainer.getMessage(), acknowledgmentCallback, true, consumerProperties.getExtension());
} catch (Exception e) {
//TODO If one day the errorChannel or attributesHolder can be retrieved, use those instead
logger.warn(e, String.format("XMLMessage %s cannot be consumed. It will be rejected",
Expand All @@ -188,7 +188,7 @@ private Message<List<?>> processBatchIfAvailable() {
return xmlMessageMapper.mapBatchMessage(batchedMessages.get()
.stream()
.map(MessageContainer::getMessage)
.collect(Collectors.toList()), acknowledgmentCallback, true);
.collect(Collectors.toList()), acknowledgmentCallback, true, consumerProperties.getExtension());
} catch (Exception e) {
logger.warn(e, "Message batch cannot be consumed. It will be rejected");
AckUtils.reject(acknowledgmentCallback);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.solace.spring.cloud.stream.binder.properties;

import com.solacesystems.jcsmp.EndpointProperties;
import java.util.ArrayList;
import java.util.List;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -133,6 +135,10 @@ public class SolaceConsumerProperties extends SolaceCommonProperties {
private Long errorMsgTtl = null;
// ------------------------

/**
* The list of headers to exclude when converting consumed Solace message to Spring message.
*/
private List<String> headerExclusions = new ArrayList<>();

public int getBatchMaxSize() {
return batchMaxSize;
Expand Down Expand Up @@ -317,4 +323,12 @@ public Long getErrorMsgTtl() {
public void setErrorMsgTtl(Long errorMsgTtl) {
this.errorMsgTtl = errorMsgTtl;
}

public List<String> getHeaderExclusions() {
return headerExclusions;
}

public void setHeaderExclusions(List<String> headerExclusions) {
this.headerExclusions = headerExclusions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,18 @@ public XMLMessage map(Message<?> message, Collection<String> excludedHeaders, bo
}

public Message<List<?>> mapBatchMessage(List<? extends XMLMessage> xmlMessages,
AcknowledgmentCallback acknowledgmentCallback)
AcknowledgmentCallback acknowledgmentCallback, SolaceConsumerProperties solaceConsumerProperties)
throws SolaceMessageConversionException {
return mapBatchMessage(xmlMessages, acknowledgmentCallback, false);
return mapBatchMessage(xmlMessages, acknowledgmentCallback, false, solaceConsumerProperties);
}

public Message<List<?>> mapBatchMessage(List<? extends XMLMessage> xmlMessages,
AcknowledgmentCallback acknowledgmentCallback,
boolean setRawMessageHeader) throws SolaceMessageConversionException {
boolean setRawMessageHeader, SolaceConsumerProperties solaceConsumerProperties) throws SolaceMessageConversionException {
List<Map<String, Object>> batchedHeaders = new ArrayList<>();
List<Object> batchedPayloads = new ArrayList<>();
for (XMLMessage xmlMessage : xmlMessages) {
Message<?> message = mapInternal(xmlMessage).build();
Message<?> message = mapInternal(xmlMessage, solaceConsumerProperties).build();
batchedHeaders.add(message.getHeaders());
batchedPayloads.add(message.getPayload());
}
Expand All @@ -186,20 +186,21 @@ public Message<List<?>> mapBatchMessage(List<? extends XMLMessage> xmlMessages,
.build();
}

public Message<?> map(XMLMessage xmlMessage, AcknowledgmentCallback acknowledgmentCallback)
public Message<?> map(XMLMessage xmlMessage, AcknowledgmentCallback acknowledgmentCallback, SolaceConsumerProperties solaceConsumerProperties)
throws SolaceMessageConversionException {
return map(xmlMessage, acknowledgmentCallback, false);
return map(xmlMessage, acknowledgmentCallback, false, solaceConsumerProperties);
}

public Message<?> map(XMLMessage xmlMessage, AcknowledgmentCallback acknowledgmentCallback,
boolean setRawMessageHeader) throws SolaceMessageConversionException {
return injectRootMessageHeaders(mapInternal(xmlMessage), acknowledgmentCallback, setRawMessageHeader ?
boolean setRawMessageHeader, SolaceConsumerProperties solaceConsumerProperties) throws SolaceMessageConversionException {
return injectRootMessageHeaders(mapInternal(xmlMessage, solaceConsumerProperties), acknowledgmentCallback, setRawMessageHeader ?
xmlMessage : null).build();
}

private AbstractIntegrationMessageBuilder<?> mapInternal(XMLMessage xmlMessage)
private AbstractIntegrationMessageBuilder<?> mapInternal(XMLMessage xmlMessage, SolaceConsumerProperties solaceConsumerProperties)
throws SolaceMessageConversionException {
SDTMap metadata = xmlMessage.getProperties();
List<String> excludedHeaders = solaceConsumerProperties.getHeaderExclusions();

Object payload;
if (xmlMessage instanceof BytesMessage) {
Expand Down Expand Up @@ -247,7 +248,7 @@ private AbstractIntegrationMessageBuilder<?> mapInternal(XMLMessage xmlMessage)

AbstractIntegrationMessageBuilder<?> builder = MESSAGE_BUILDER_FACTORY
.withPayload(payload)
.copyHeaders(map(metadata))
.copyHeaders(map(metadata, excludedHeaders))
.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, xmlMessage.getHTTPContentType());

if (isNullPayload) {
Expand All @@ -261,6 +262,9 @@ private AbstractIntegrationMessageBuilder<?> mapInternal(XMLMessage xmlMessage)
if (!header.getValue().isReadable()) {
continue;
}
if (excludedHeaders != null && excludedHeaders.contains(header.getKey())) {
continue;
}
if (ignoredHeaderProperties.contains(header.getKey())) {
continue;
}
Expand Down Expand Up @@ -320,15 +324,19 @@ SDTMap map(MessageHeaders headers, Collection<String> excludedHeaders, boolean c
return metadata;
}

MessageHeaders map(SDTMap metadata) {
MessageHeaders map(SDTMap metadata, Collection<String> excludedHeaders) {
if (metadata == null) {
return new MessageHeaders(Collections.emptyMap());
}

final Collection<String> exclusionList =
excludedHeaders != null ? excludedHeaders : Collections.emptyList();

Map<String,Object> headers = new HashMap<>();

// Deserialize headers
if (metadata.containsKey(SolaceBinderHeaders.SERIALIZED_HEADERS)) {
if (!exclusionList.contains(SolaceBinderHeaders.SERIALIZED_HEADERS) &&
metadata.containsKey(SolaceBinderHeaders.SERIALIZED_HEADERS)) {
Encoder encoder = null;
if (metadata.containsKey(SolaceBinderHeaders.SERIALIZED_HEADERS_ENCODING)) {
String encoding = rethrowableCall(metadata::getString, SolaceBinderHeaders.SERIALIZED_HEADERS_ENCODING);
Expand All @@ -345,7 +353,9 @@ MessageHeaders map(SDTMap metadata) {
rethrowableCall(metadata::getString, SolaceBinderHeaders.SERIALIZED_HEADERS));

for (String headerName : serializedHeaders) {
if (metadata.containsKey(headerName)) {
if (exclusionList.contains(headerName)) {
continue;
} else if (metadata.containsKey(headerName)) {
byte[] serializedValue = encoder != null ?
encoder.decode(rethrowableCall(metadata::getString, headerName)) :
rethrowableCall(metadata::getBytes, headerName);
Expand All @@ -359,6 +369,7 @@ MessageHeaders map(SDTMap metadata) {
}

metadata.keySet().stream()
.filter(h -> !exclusionList.contains(h))
.filter(h -> !headers.containsKey(h))
.filter(h -> !SolaceBinderHeaderMeta.META.containsKey(h))
.filter(h -> !SolaceHeaderMeta.META.containsKey(h))
Expand All @@ -370,7 +381,8 @@ MessageHeaders map(SDTMap metadata) {
headers.put(h, value);
});

if (metadata.containsKey(SolaceBinderHeaders.MESSAGE_VERSION)) {
if (!exclusionList.contains(SolaceBinderHeaders.MESSAGE_VERSION) &&
metadata.containsKey(SolaceBinderHeaders.MESSAGE_VERSION)) {
int messageVersion = rethrowableCall(metadata::getInteger, SolaceBinderHeaders.MESSAGE_VERSION);
headers.put(SolaceBinderHeaders.MESSAGE_VERSION, messageVersion);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.solace.spring.cloud.stream.binder.properties;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class SolaceConsumerPropertiesTest {
@ParameterizedTest
Expand Down Expand Up @@ -81,4 +83,9 @@ public void testFailSetFlowRebindBackOffMultiplier(double multiplier) {
assertThrows(IllegalArgumentException.class, () -> new SolaceConsumerProperties()
.setFlowRebindBackOffMultiplier(multiplier));
}

@Test
void testDefaultHeaderExclusionsListIsEmpty() {
assertTrue(new SolaceConsumerProperties().getHeaderExclusions().isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.solace.spring.cloud.stream.binder.messaging.HeaderMeta;
import com.solace.spring.cloud.stream.binder.messaging.SolaceBinderHeaderMeta;
import com.solace.spring.cloud.stream.binder.messaging.SolaceBinderHeaders;
import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
import com.solace.spring.cloud.stream.binder.test.util.SerializableFoo;
import com.solace.test.integration.junit.jupiter.extension.PubSubPlusExtension;
import com.solacesystems.jcsmp.BytesMessage;
Expand Down Expand Up @@ -339,6 +340,7 @@ public void testPayloadFromJmsToSpring(JCSMPSession jcsmpSession, SoftAssertions
messages.add(mapMessage);
}

SolaceConsumerProperties consumerProperties = new SolaceConsumerProperties();
XMLMessageConsumer messageConsumer = null;
try {
Set<Class<? extends XMLMessage>> processedMessageTypes = new HashSet<>();
Expand All @@ -349,7 +351,7 @@ public void testPayloadFromJmsToSpring(JCSMPSession jcsmpSession, SoftAssertions
public void onReceive(BytesXMLMessage bytesXMLMessage) {
logger.info("Got message " + bytesXMLMessage);
try {
Message<?> msg = xmlMessageMapper.map(bytesXMLMessage, null);
Message<?> msg = xmlMessageMapper.map(bytesXMLMessage, null, consumerProperties);
if (msg.getPayload() instanceof byte[]) {
softly.assertThat(msg.getPayload()).isEqualTo("test".getBytes());
processedMessageTypes.add(BytesMessage.class);
Expand Down Expand Up @@ -428,6 +430,7 @@ public void testSerializedPayloadFromJmsToSpring(JCSMPSession jcsmpSession, Soft
ObjectMessage message = jmsSession.createObjectMessage(payload);
message.setBooleanProperty(SolaceBinderHeaders.SERIALIZED_PAYLOAD, true);

SolaceConsumerProperties consumerProperties = new SolaceConsumerProperties();
XMLMessageConsumer messageConsumer = null;
try {
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
Expand All @@ -437,7 +440,7 @@ public void testSerializedPayloadFromJmsToSpring(JCSMPSession jcsmpSession, Soft
public void onReceive(BytesXMLMessage bytesXMLMessage) {
logger.info("Got message " + bytesXMLMessage);
try {
softly.assertThat(xmlMessageMapper.map(bytesXMLMessage, null).getPayload())
softly.assertThat(xmlMessageMapper.map(bytesXMLMessage, null, consumerProperties).getPayload())
.isEqualTo(payload);
} catch (Exception e) {
exceptionAtomicReference.set(e);
Expand Down
Loading

0 comments on commit b30fa68

Please sign in to comment.