Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish events to Spring Cloud Stream channels #5

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@
<spring.version>5.1.4.RELEASE</spring.version>
<spring.boot.version>2.1.2.RELEASE</spring.boot.version>
<spring-cloud.version>2.0.1.RELEASE</spring-cloud.version>
<spring-messaging.version>5.1.6.RELEASE</spring-messaging.version>
<spring-integration.version>5.1.4.RELEASE</spring-integration.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<mockito.version>2.15.0</mockito.version>
<jackson.version>2.9.8</jackson.version>
Expand Down
12 changes: 12 additions & 0 deletions springcloud/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>${spring-messaging.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>${spring-integration.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package org.axonframework.extensions.springcloud.eventhandling.converter;

import org.axonframework.common.Assert;
import org.axonframework.common.DateTimeUtils;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericDomainEventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.messaging.Headers;
import org.axonframework.messaging.MetaData;
import org.axonframework.serialization.*;
import org.axonframework.serialization.json.JacksonSerializer;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import static org.axonframework.common.DateTimeUtils.formatInstant;
import static org.axonframework.messaging.Headers.MESSAGE_TIMESTAMP;

/**
* Default byte[] spring message to event message converter based on Jackson serializer.
*
* @Mehdi Chitforoosh
* @since 4.1
*/
public class DefaultSpringMessageEventMessageConverter implements SpringMessageEventMessageConverter {

private final Serializer serializer;

public DefaultSpringMessageEventMessageConverter() {
this.serializer = JacksonSerializer.builder().build();
}

public DefaultSpringMessageEventMessageConverter(Serializer serializer) {
Assert.notNull(serializer, () -> "Serializer may not be null");
this.serializer = serializer;
}

@Override
public Message<?> toSpringMessage(EventMessage<?> eventMessage) {
SerializedObject<byte[]> serializedObject = eventMessage.serializePayload(serializer, byte[].class);
Map<String, Object> headers = new HashMap<>();
eventMessage.getMetaData().forEach((k, v) -> headers.put(Headers.MESSAGE_METADATA + "-" + k, v));
Headers.defaultHeaders(eventMessage, serializedObject).forEach((k, v) -> {
if (k.equals(MESSAGE_TIMESTAMP)) {
headers.put(k, formatInstant(eventMessage.getTimestamp()));
} else {
headers.put(k, v);
}
});
return new GenericMessage<>(serializedObject.getData(), new DefaultSpringMessageEventMessageConverter.SettableTimestampMessageHeaders(headers, eventMessage.getTimestamp().toEpochMilli()));

}

@Override
public EventMessage<?> toEventMessage(Message<?> message) {
if (!(message.getPayload() instanceof byte[])) {
throw new IllegalArgumentException("message payload should be byte[]");
}
MessageHeaders headers = message.getHeaders();
if (!headers.keySet().containsAll(Arrays.asList(Headers.MESSAGE_ID, Headers.MESSAGE_TYPE))) {
throw new IllegalArgumentException("axon message id or axon message type doesn't exist.");
}
byte[] payload = (byte[]) message.getPayload();
Map<String, Object> metaData = new HashMap<>();
headers.forEach((k, v) -> {
if (k.startsWith(Headers.MESSAGE_METADATA + "-")) {
metaData.put(k.substring((Headers.MESSAGE_METADATA + "-").length()), v);
}
});
SimpleSerializedObject<byte[]> serializedMessage = new SimpleSerializedObject<>(payload, byte[].class,
Objects.toString(headers.get(Headers.MESSAGE_TYPE)),
Objects.toString(headers.get(Headers.MESSAGE_REVISION), null));
SerializedMessage<?> delegateMessage = new SerializedMessage<>(Objects.toString(headers.get(Headers.MESSAGE_ID)),
new LazyDeserializingObject<>(serializedMessage, serializer),
new LazyDeserializingObject<>(MetaData.from(metaData)));
String timestamp = Objects.toString(headers.get(MESSAGE_TIMESTAMP));
if (headers.containsKey(Headers.AGGREGATE_ID)) {
return new GenericDomainEventMessage<>(Objects.toString(headers.get(Headers.AGGREGATE_TYPE)),
Objects.toString(headers.get(Headers.AGGREGATE_ID)),
(Long) headers.get(Headers.AGGREGATE_SEQ),
delegateMessage, () -> DateTimeUtils.parseInstant(timestamp));
} else {
return new GenericEventMessage<>(delegateMessage, () -> DateTimeUtils.parseInstant(timestamp));
}
}

private static class SettableTimestampMessageHeaders extends MessageHeaders {
protected SettableTimestampMessageHeaders(Map<String, Object> headers, Long timestamp) {
super(headers, null, timestamp);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2010-2014. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.extensions.springcloud.eventhandling.converter;

import org.axonframework.eventhandling.EventMessage;
import org.springframework.messaging.Message;

/**
* Interface describing a mechanism that converts Spring Messages from an Axon Messages and vice versa.
*
* @author Mehdi Chitforoosh
* @since 4.1
*/
public interface SpringMessageEventMessageConverter {

/**
* Creates an Spring Message from given {@code eventMessage}.
*
* @param eventMessage The EventMessage to create the Spring Message from
* @return an Spring Message containing the payload and headers
* Broker.
*/
Message<?> toSpringMessage(EventMessage<?> eventMessage);

/**
* Reconstruct an EventMessage from the given spring message. The returned value
* resolves to a message.
*
* @param message spring message
* @return The Event Message to publish on the local event processors
*/
EventMessage<?> toEventMessage(Message<?> message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.axonframework.extensions.springcloud.eventhandling.inbound;

import org.axonframework.eventhandling.EventMessage;
import org.axonframework.extensions.springcloud.eventhandling.converter.DefaultSpringMessageEventMessageConverter;
import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageEventMessageConverter;
import org.axonframework.messaging.SubscribableMessageSource;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;

import java.util.List;
import java.util.function.Predicate;

/**
*
* @author Mehdi Chitforoosh
* @since 4.1
*/
public class AxonInboundChannelAdapter extends MessageProducerSupport {

private final Predicate<? super EventMessage<?>> filter;
private final SubscribableMessageSource<EventMessage<?>> messageSource;
private final SpringMessageEventMessageConverter converter;

/**
* Initialize an adapter to forward messages from the given {@code messageSource} to the given {@code channel}.
* Messages are not filtered; all messages are forwarded to the MessageChannel
*
* @param messageSource The event bus to subscribe to.
*/
public AxonInboundChannelAdapter(SubscribableMessageSource<EventMessage<?>> messageSource) {
this(messageSource, m -> true, new DefaultSpringMessageEventMessageConverter());
}

/**
* Initialize an adapter to forward messages from the given {@code messageSource} to the given {@code channel}.
* Messages are filtered using the given {@code filter}.
*
* @param messageSource The inbound of messages to subscribe to.
* @param filter The filter that indicates which messages to forward.
*/
public AxonInboundChannelAdapter(SubscribableMessageSource<EventMessage<?>> messageSource, Predicate<? super EventMessage<?>> filter) {
this(messageSource, filter, new DefaultSpringMessageEventMessageConverter());
}

/**
* Initialize an adapter to forward messages from the given {@code messageSource} to the given {@code channel}.
* Messages are filtered using the given {@code filter}.
*
* @param messageSource The inbound of messages to subscribe to.
* @param filter The filter that indicates which messages to forward.
* @param converter The converter to use to convert event message into Spring message
*/
public AxonInboundChannelAdapter(SubscribableMessageSource<EventMessage<?>> messageSource, Predicate<? super EventMessage<?>> filter, SpringMessageEventMessageConverter converter) {
this.messageSource = messageSource;
this.filter = filter;
this.converter = converter;
}

/**
* Subscribes this event listener to the event bus.
*/
@Override
protected void onInit() {
super.onInit();
this.messageSource.subscribe(this::handle);
}


/**
* If allows by the filter, wraps the given {@code event} in a {@link GenericMessage} ands sends it to the
* configured {@link MessageChannel}.
*
* @param events the events to handle
*/
protected void handle(List<? extends EventMessage<?>> events) {
events.stream()
.filter(this.filter)
.forEach(event -> this.sendMessage(this.converter.toSpringMessage(event)));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.axonframework.extensions.springcloud.eventhandling.outbound;

import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.extensions.springcloud.eventhandling.converter.DefaultSpringMessageEventMessageConverter;
import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageEventMessageConverter;
import org.axonframework.messaging.SubscribableMessageSource;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.messaging.Message;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

import static java.util.Collections.singletonList;

/**
*
* @author Mehdi Chitforoosh
* @since 4.1
*/
public class AxonOutboundChannelAdapter extends AbstractMessageProducingHandler implements SubscribableMessageSource<EventMessage<?>> {

private final CopyOnWriteArrayList<Consumer<List<? extends EventMessage<?>>>> messageProcessors = new CopyOnWriteArrayList<>();
private final SpringMessageEventMessageConverter eventMessageConverter;

/**
* Initialize an AxonOutboundChannelAdapter instance that sends all incoming Event Messages to the given
* {@code eventBus}. It is still possible for other Event Processors to subscribe to this MessageChannelAdapter.
*
* @param eventBus The EventBus instance for forward all messages to
*/
public AxonOutboundChannelAdapter(EventBus eventBus) {
this(singletonList(eventBus::publish), new DefaultSpringMessageEventMessageConverter());
}

/**
* Initialize the adapter to publish all incoming events to the subscribed processors. Note that this instance should
* be registered as a consumer of a Spring Message Channel.
*
* @param processors Processors to be subscribed
* @param eventMessageConverter The message converter to use to convert spring message into event message
*/
public AxonOutboundChannelAdapter(List<Consumer<List<? extends EventMessage<?>>>> processors, SpringMessageEventMessageConverter eventMessageConverter) {
this.messageProcessors.addAll(processors);
this.eventMessageConverter = eventMessageConverter;
}

@Override
public Registration subscribe(Consumer<List<? extends EventMessage<?>>> messageProcessor) {
messageProcessors.add(messageProcessor);
return () -> messageProcessors.remove(messageProcessor);
}

/**
* Handles the given {@code message}. If the filter refuses the message, it is ignored.
*
* @param message The message containing the event to publish
*/
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
EventMessage<?> eventMessage = eventMessageConverter.toEventMessage(message);
List<? extends EventMessage<?>> messages = singletonList(eventMessage);
for (Consumer<List<? extends EventMessage<?>>> messageProcessor : messageProcessors) {
messageProcessor.accept(messages);
}
}

}
Loading