Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish events to Spring Cloud Stream channels #5

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
~ Copyright (c) 2010-2021. Axon Framework
~
Expand All @@ -14,14 +14,15 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<groupId>org.axonframework.extensions.springcloud</groupId>
<artifactId>axon-springcloud-parent</artifactId>
<version>4.5-SNAPSHOT</version>
<modules>
<module>springcloud</module>
<module>springcloud-streams</module>
<module>springcloud-spring-boot-autoconfigure</module>
<module>springcloud-spring-boot-starter</module>
</modules>
Expand Down Expand Up @@ -52,8 +53,11 @@

<axon.version>4.5.3</axon.version>
<spring-cloud-release.version>Hoxton.SR12</spring-cloud-release.version>
<spring-cloud-stream.version>Horsham.SR12</spring-cloud-stream.version>

<spring.version>5.3.9</spring.version>
<spring-messaging.version>{spring.version}</spring-messaging.version>
<spring-integration.version>5.4.8</spring-integration.version>
<spring.boot.version>2.3.12.RELEASE</spring.boot.version>

<slf4j.version>1.7.32</slf4j.version>
Expand Down
100 changes: 100 additions & 0 deletions springcloud-streams/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2010-2020. Axon Framework
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The notice should extend to 2021, as this commit is from 2021.

~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>axon-springcloud-parent</artifactId>
<groupId>org.axonframework.extensions.springcloud</groupId>
<version>4.5-SNAPSHOT</version>
</parent>

<artifactId>axon-springcloud-streams</artifactId>

<name>Axon Framework Spring Cloud Stream Extension</name>
<description>
Module containing Message Adapter implementations for distributing events over Spring Cloud Stream.
</description>

<packaging>jar</packaging>

<dependencies>

<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-messaging</artifactId>
<version>${axon.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>${spring-messaging.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>${spring-integration.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifestEntries>
<Automatic-Module-Name>org.axonframework.extensions.springcloud</Automatic-Module-Name>
</manifestEntries>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.extensions.springcloud.eventhandling;

import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageConverter;
import org.axonframework.messaging.SubscribableMessageSource;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.Predicate;

import static java.util.Collections.singletonList;
import static org.axonframework.common.BuilderUtils.assertNonNull;

/**
* Initialize the adapter to publish all incoming events to the subscribed processors. Note that this instance should
* be registered as a consumer of a Spring Message Channel.
*
* @author Mehdi chitforoosh
* @since 4.5
*/
public class SpringStreamMessageProcessor extends AbstractMessageProducingHandler implements SubscribableMessageSource<EventMessage<?>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we call it the SpringStreamMessageSource instead of the SpringStreamMessageProcessor?

Calling it a processor to closely aligns with Axon's Event Processor logic.
Hence, I assume it will confuse users.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I notice there already is a SpringStreamMessageSource. On top of that, it also extends the AbstractMessageProducingHandler class and implements the SubscribableMessageSource.
Would you mind explaining why both exist in this pull request?


private final SubscribableMessageSource<EventMessage<?>> messageSource;
private final CopyOnWriteArrayList<Consumer<List<? extends EventMessage<?>>>> messageProcessors = new CopyOnWriteArrayList<>();
private Predicate<? super EventMessage<?>> filter;
private final SpringMessageConverter converter;

/**
* Instantiate a {@link SpringStreamMessageProcessor} based on the fields contained in the {@link SpringStreamMessageProcessor.Builder}.
* The {@link SubscribableMessageSource} is a <b>hard requirement</b> and thus should be provided.
* The {@link SpringMessageConverter} is a <b>hard requirement</b> and thus should be provided.
* <p>
* Will validate that the {@link SubscribableMessageSource} and {@link SpringMessageConverter} and {@link EventBus} are not {@code null},
* and will throw an {@link AxonConfigurationException} if for either of them this holds.
*
* @param builder the {@link SpringStreamMessageProcessor.Builder} used to instantiate a {@link SpringStreamMessageProcessor} instance
*/
protected SpringStreamMessageProcessor(SpringStreamMessageProcessor.Builder builder) {
builder.validate();
this.filter = builder.filter;
this.messageSource = builder.messageSource;
this.converter = builder.converter;
if (builder.eventBus != null) {
this.messageProcessors.addAll(singletonList(builder.eventBus::publish));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should add EventBus by default here? @smcvb what do you think? In my opinion, we could leave this decision to the users of the API, but I am afraid that in that case, we are missing something like MultiSubscribableMessageSource.java allowing them to subscribe to multiple sources: eventBus/eventStore, streamSource, ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, I think this is a bad idea.
As you point @idugalic, we should have a form of MultiSubscribableMessageSource.

Simply publishing the Spring Messages as new events on the local EventBus is very likely going to cause a lot of issues.
Let alone that it leaks the concerns of another service's Events into the domain connecting to it.

}
// Subscribes this event message handler to the message source.
this.messageSource.subscribe(this::handle);
}

/**
* Instantiate a Builder to be able to create a {@link SpringStreamMessageProcessor}.
* The {@link SubscribableMessageSource} is a <b>hard requirement</b> and as such should be provided.
* The {@link SpringMessageConverter} is a <b>hard requirement</b> and thus should be provided.
*
* @return a Builder to be able to create a {@link SpringStreamMessageProcessor}.
*/
public static SpringStreamMessageProcessor.Builder builder() {
return new SpringStreamMessageProcessor.Builder();
}

@Override
public Registration subscribe(Consumer<List<? extends EventMessage<?>>> messageProcessor) {
messageProcessors.add(messageProcessor);
return () -> messageProcessors.remove(messageProcessor);
}

/**
* If allows by the filter, wraps the given {@link EventMessage} in a {@link GenericMessage} ands sends it to the
* configured {@link MessageChannel}.
*
* @param events the events to handle
*/
protected void handle(List<? extends EventMessage<?>> events) {
events.stream()
.filter(filter)
.forEach(event -> this.sendOutput(converter.createSpringMessage(event), null, false));
}

/**
* Handles the given {@link Message}. If the filter refuses the message, it is ignored.
*
* @param message The message containing the event to publish
*/
@Override
protected void handleMessageInternal(Message<?> message) {
Optional<EventMessage<?>> optional = converter.readSpringMessage(message);
optional.ifPresent(eventMessage -> {
List<? extends EventMessage<?>> messages = singletonList(eventMessage);
for (Consumer<List<? extends EventMessage<?>>> messageProcessor : messageProcessors) {
messageProcessor.accept(messages);
}
});
}

/**
* Set filter to remove filtered messages
*
* @param filter
*/
public void setFilter(Predicate<? super EventMessage<?>> filter) {
this.filter = filter;
}

/**
* Builder class to instantiate a {@link SpringStreamMessageProcessor}.
* The {@link SubscribableMessageSource} is a <b>hard requirement</b> and thus should be provided.
* The {@link SpringMessageConverter} is a <b>hard requirement</b> and thus should be provided.
*/
public static class Builder {

private SubscribableMessageSource<EventMessage<?>> messageSource;
private EventBus eventBus;
private Predicate<? super EventMessage<?>> filter = m -> true;
private SpringMessageConverter converter;

/**
* Sets the filter to send specific event messages.
*
* @param filter The filter to send filtered event messages
* @return the current Builder instance, for fluent interfacing
*/
public SpringStreamMessageProcessor.Builder filter(Predicate<? super EventMessage<?>> filter) {
this.filter = filter;
return this;
}

/**
* Sets the messageSource for subscribing handler.
*
* @param messageSource The messageSource to subscribe
* @return the current Builder instance, for fluent interfacing
*/
public SpringStreamMessageProcessor.Builder messageSource(SubscribableMessageSource<EventMessage<?>> messageSource) {
assertNonNull(messageSource, "messageSource may not be null");
this.messageSource = messageSource;
return this;
}

/**
* Sets the eventBus to publish received event message.
*
* @param eventBus The eventBus to publish event messages
* @return the current Builder instance, for fluent interfacing
*/
public SpringStreamMessageProcessor.Builder eventBus(EventBus eventBus) {
this.eventBus = eventBus;
return this;
}

/**
* Sets the converter to convert spring messages to event messages and versa.
*
* @param converter The converter to convert messages
* @return the current Builder instance, for fluent interfacing
*/
public SpringStreamMessageProcessor.Builder converter(SpringMessageConverter converter) {
assertNonNull(converter, "converter may not be null");
this.converter = converter;
return this;
}

/**
* Initializes a {@link SpringStreamMessageProcessor} as specified through this Builder.
*
* @return a {@link SpringStreamMessageProcessor} as specified through this Builder
*/
public SpringStreamMessageProcessor build() {
return new SpringStreamMessageProcessor(this);
}

/**
* Validate whether the fields contained in this Builder as set accordingly.
*
* @throws AxonConfigurationException if one field is asserted to be incorrect according to the Builder's specifications
*/
protected void validate() {
assertNonNull(messageSource, "The MessageSource is a hard requirement and should be provided");
assertNonNull(converter, "The Converter is a hard requirement and should be provided");
}
}

}
Loading