From ae67a85c40a8d2875c3d6eac1837ee905362e735 Mon Sep 17 00:00:00 2001 From: Mehdi Chitforoosh Yazdi Date: Fri, 12 Apr 2019 18:53:41 +0430 Subject: [PATCH 1/9] Add Axon Inbound,Outbound spring message channel adapters and processor message handler. --- pom.xml | 2 + springcloud/pom.xml | 12 ++ .../inbound/AxonInboundChannelAdapter.java | 82 +++++++++++++ .../outbound/AxonOutboundChannelAdapter.java | 70 +++++++++++ .../AxonProcessorMessageHandler.java | 110 ++++++++++++++++++ 5 files changed, 276 insertions(+) create mode 100644 springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/inbound/AxonInboundChannelAdapter.java create mode 100644 springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/outbound/AxonOutboundChannelAdapter.java create mode 100644 springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/processor/AxonProcessorMessageHandler.java diff --git a/pom.xml b/pom.xml index d547dc1..a092ff3 100644 --- a/pom.xml +++ b/pom.xml @@ -114,6 +114,8 @@ 5.1.4.RELEASE 2.1.2.RELEASE 2.0.1.RELEASE + 5.1.6.RELEASE + 5.1.4.RELEASE UTF-8 2.15.0 2.9.8 diff --git a/springcloud/pom.xml b/springcloud/pom.xml index 66b63ef..0716571 100644 --- a/springcloud/pom.xml +++ b/springcloud/pom.xml @@ -67,6 +67,18 @@ true + + org.springframework + spring-messaging + ${spring-messaging.version} + + + + org.springframework.integration + spring-integration-core + ${spring-integration.version} + + com.fasterxml.jackson.core jackson-databind diff --git a/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/inbound/AxonInboundChannelAdapter.java b/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/inbound/AxonInboundChannelAdapter.java new file mode 100644 index 0000000..986b816 --- /dev/null +++ b/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/inbound/AxonInboundChannelAdapter.java @@ -0,0 +1,82 @@ +package org.axonframework.extensions.springcloud.eventhandling.inbound; + +import org.axonframework.eventhandling.EventMessage; +import org.axonframework.extensions.springcloud.eventhandling.converter.DefaultSpringMessageEventMessageConverter; +import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageEventMessageConverter; +import org.axonframework.messaging.SubscribableMessageSource; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.GenericMessage; + +import java.util.List; +import java.util.function.Predicate; + +/** + * + * @author Mehdi Chitforoosh + * @since 4.1 + */ +public class AxonInboundChannelAdapter extends MessageProducerSupport { + + private final Predicate> filter; + private final SubscribableMessageSource> messageSource; + private final SpringMessageEventMessageConverter converter; + + /** + * Initialize an adapter to forward messages from the given {@code messageSource} to the given {@code channel}. + * Messages are not filtered; all messages are forwarded to the MessageChannel + * + * @param messageSource The event bus to subscribe to. + */ + public AxonInboundChannelAdapter(SubscribableMessageSource> messageSource) { + this(messageSource, m -> true, new DefaultSpringMessageEventMessageConverter()); + } + + /** + * Initialize an adapter to forward messages from the given {@code messageSource} to the given {@code channel}. + * Messages are filtered using the given {@code filter}. + * + * @param messageSource The inbound of messages to subscribe to. + * @param filter The filter that indicates which messages to forward. + */ + public AxonInboundChannelAdapter(SubscribableMessageSource> messageSource, Predicate> filter) { + this(messageSource, filter, new DefaultSpringMessageEventMessageConverter()); + } + + /** + * Initialize an adapter to forward messages from the given {@code messageSource} to the given {@code channel}. + * Messages are filtered using the given {@code filter}. + * + * @param messageSource The inbound of messages to subscribe to. + * @param filter The filter that indicates which messages to forward. + * @param converter The converter to use to convert event message into Spring message + */ + public AxonInboundChannelAdapter(SubscribableMessageSource> messageSource, Predicate> filter, SpringMessageEventMessageConverter converter) { + this.messageSource = messageSource; + this.filter = filter; + this.converter = converter; + } + + /** + * Subscribes this event listener to the event bus. + */ + @Override + protected void onInit() { + super.onInit(); + this.messageSource.subscribe(this::handle); + } + + + /** + * If allows by the filter, wraps the given {@code event} in a {@link GenericMessage} ands sends it to the + * configured {@link MessageChannel}. + * + * @param events the events to handle + */ + protected void handle(List> events) { + events.stream() + .filter(this.filter) + .forEach(event -> this.sendMessage(this.converter.toSpringMessage(event))); + } + +} diff --git a/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/outbound/AxonOutboundChannelAdapter.java b/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/outbound/AxonOutboundChannelAdapter.java new file mode 100644 index 0000000..8d49222 --- /dev/null +++ b/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/outbound/AxonOutboundChannelAdapter.java @@ -0,0 +1,70 @@ +package org.axonframework.extensions.springcloud.eventhandling.outbound; + +import org.axonframework.common.Registration; +import org.axonframework.eventhandling.EventBus; +import org.axonframework.eventhandling.EventMessage; +import org.axonframework.extensions.springcloud.eventhandling.converter.DefaultSpringMessageEventMessageConverter; +import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageEventMessageConverter; +import org.axonframework.messaging.SubscribableMessageSource; +import org.springframework.integration.handler.AbstractMessageProducingHandler; +import org.springframework.messaging.Message; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; + +import static java.util.Collections.singletonList; + +/** + * + * @author Mehdi Chitforoosh + * @since 4.1 + */ +public class AxonOutboundChannelAdapter extends AbstractMessageProducingHandler implements SubscribableMessageSource> { + + private final CopyOnWriteArrayList>>> messageProcessors = new CopyOnWriteArrayList<>(); + private final SpringMessageEventMessageConverter eventMessageConverter; + + /** + * Initialize an AxonOutboundChannelAdapter instance that sends all incoming Event Messages to the given + * {@code eventBus}. It is still possible for other Event Processors to subscribe to this MessageChannelAdapter. + * + * @param eventBus The EventBus instance for forward all messages to + */ + public AxonOutboundChannelAdapter(EventBus eventBus) { + this(singletonList(eventBus::publish), new DefaultSpringMessageEventMessageConverter()); + } + + /** + * Initialize the adapter to publish all incoming events to the subscribed processors. Note that this instance should + * be registered as a consumer of a Spring Message Channel. + * + * @param processors Processors to be subscribed + * @param eventMessageConverter The message converter to use to convert spring message into event message + */ + public AxonOutboundChannelAdapter(List>>> processors, SpringMessageEventMessageConverter eventMessageConverter) { + this.messageProcessors.addAll(processors); + this.eventMessageConverter = eventMessageConverter; + } + + @Override + public Registration subscribe(Consumer>> messageProcessor) { + messageProcessors.add(messageProcessor); + return () -> messageProcessors.remove(messageProcessor); + } + + /** + * Handles the given {@code message}. If the filter refuses the message, it is ignored. + * + * @param message The message containing the event to publish + */ + @Override + protected void handleMessageInternal(Message message) throws Exception { + EventMessage eventMessage = eventMessageConverter.toEventMessage(message); + List> messages = singletonList(eventMessage); + for (Consumer>> messageProcessor : messageProcessors) { + messageProcessor.accept(messages); + } + } + +} diff --git a/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/processor/AxonProcessorMessageHandler.java b/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/processor/AxonProcessorMessageHandler.java new file mode 100644 index 0000000..a93461a --- /dev/null +++ b/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/processor/AxonProcessorMessageHandler.java @@ -0,0 +1,110 @@ +package org.axonframework.extensions.springcloud.eventhandling.processor; + +import org.axonframework.common.Registration; +import org.axonframework.eventhandling.EventBus; +import org.axonframework.eventhandling.EventMessage; +import org.axonframework.extensions.springcloud.eventhandling.converter.DefaultSpringMessageEventMessageConverter; +import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageEventMessageConverter; +import org.axonframework.messaging.SubscribableMessageSource; +import org.springframework.integration.handler.AbstractMessageProducingHandler; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.GenericMessage; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; +import java.util.function.Predicate; + +import static java.util.Collections.singletonList; + +/** + * @author Mehdi chitforoosh + * @since 4.1 + */ +public class AxonProcessorMessageHandler extends AbstractMessageProducingHandler implements SubscribableMessageSource> { + + private final SubscribableMessageSource> messageSource; + private final CopyOnWriteArrayList>>> messageProcessors = new CopyOnWriteArrayList<>(); + private final Predicate> filter; + private final SpringMessageEventMessageConverter eventMessageConverter; + + /** + * Initialize the adapter to publish all incoming events to the subscribed processors. Note that this instance should + * be registered as a consumer of a Spring Message Channel. + */ + public AxonProcessorMessageHandler(EventBus eventBus) { + this(eventBus, singletonList(eventBus::publish), m -> true, new DefaultSpringMessageEventMessageConverter()); + } + + /** + * Initialize an AxonProcessorMessageHandler instance that sends all incoming Event Messages to the given + * {@code eventBus}. It is still possible for other Event Processors to subscribe to this MessageChannelAdapter. + * + * @param eventBus The EventBus instance for forward all messages to + * @param filter The filter that indicates which messages to forward. + */ + public AxonProcessorMessageHandler(EventBus eventBus, Predicate> filter) { + this(eventBus, singletonList(eventBus::publish), filter, new DefaultSpringMessageEventMessageConverter()); + } + + /** + * Initialize the adapter to publish all incoming events to the subscribed processors. Note that this instance should + * be registered as a consumer of a Spring Message Channel. + * + * @param messageSource The inbound of messages to subscribe to. + * @param processors Processors to be subscribed + * @param filter The filter that indicates which messages to forward. + * @param eventMessageConverter The message converter to use to convert spring message into event message + */ + public AxonProcessorMessageHandler(SubscribableMessageSource> messageSource, List>>> processors + , Predicate> filter, SpringMessageEventMessageConverter eventMessageConverter) { + this.messageSource = messageSource; + this.messageProcessors.addAll(processors); + this.filter = filter; + this.eventMessageConverter = eventMessageConverter; + } + + @Override + public Registration subscribe(Consumer>> messageProcessor) { + messageProcessors.add(messageProcessor); + return () -> messageProcessors.remove(messageProcessor); + } + + /** + * Subscribes this event listener to the event bus. + */ + @Override + protected void onInit() { + super.onInit(); + messageSource.subscribe(this::handle); + } + + + /** + * If allows by the filter, wraps the given {@code event} in a {@link GenericMessage} ands sends it to the + * configured {@link MessageChannel}. + * + * @param events the events to handle + */ + protected void handle(List> events) { + events.stream() + .filter(filter) + .forEach(event -> this.sendOutput(eventMessageConverter.toSpringMessage(event), null, false)); + } + + /** + * Handles the given {@code message}. If the filter refuses the message, it is ignored. + * + * @param message The message containing the event to publish + */ + @Override + protected void handleMessageInternal(Message message) throws Exception { + EventMessage eventMessage = eventMessageConverter.toEventMessage(message); + List> messages = singletonList(eventMessage); + for (Consumer>> messageProcessor : messageProcessors) { + messageProcessor.accept(messages); + } + } + +} From e75e594c773a0e98089ed1600ea73fb0210eecc4 Mon Sep 17 00:00:00 2001 From: Mehdi Chitforoosh Yazdi Date: Fri, 12 Apr 2019 18:54:53 +0430 Subject: [PATCH 2/9] Add default Spring Message to EventMessage converter. --- ...ultSpringMessageEventMessageConverter.java | 97 +++++++++++++++++++ .../SpringMessageEventMessageConverter.java | 47 +++++++++ 2 files changed, 144 insertions(+) create mode 100644 springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/DefaultSpringMessageEventMessageConverter.java create mode 100644 springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageEventMessageConverter.java diff --git a/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/DefaultSpringMessageEventMessageConverter.java b/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/DefaultSpringMessageEventMessageConverter.java new file mode 100644 index 0000000..710099c --- /dev/null +++ b/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/DefaultSpringMessageEventMessageConverter.java @@ -0,0 +1,97 @@ +package org.axonframework.extensions.springcloud.eventhandling.converter; + +import org.axonframework.common.Assert; +import org.axonframework.common.DateTimeUtils; +import org.axonframework.eventhandling.EventMessage; +import org.axonframework.eventhandling.GenericDomainEventMessage; +import org.axonframework.eventhandling.GenericEventMessage; +import org.axonframework.messaging.Headers; +import org.axonframework.messaging.MetaData; +import org.axonframework.serialization.*; +import org.axonframework.serialization.json.JacksonSerializer; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.GenericMessage; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.axonframework.common.DateTimeUtils.formatInstant; +import static org.axonframework.messaging.Headers.MESSAGE_TIMESTAMP; + +/** + * Default byte[] spring message to event message converter based on Jackson serializer. + * + * @Mehdi Chitforoosh + * @since 4.1 + */ +public class DefaultSpringMessageEventMessageConverter implements SpringMessageEventMessageConverter { + + private final Serializer serializer; + + public DefaultSpringMessageEventMessageConverter() { + this.serializer = JacksonSerializer.builder().build(); + } + + public DefaultSpringMessageEventMessageConverter(Serializer serializer) { + Assert.notNull(serializer, () -> "Serializer may not be null"); + this.serializer = serializer; + } + + @Override + public Message toSpringMessage(EventMessage eventMessage) { + SerializedObject serializedObject = eventMessage.serializePayload(serializer, byte[].class); + Map headers = new HashMap<>(); + eventMessage.getMetaData().forEach((k, v) -> headers.put(Headers.MESSAGE_METADATA + "-" + k, v)); + Headers.defaultHeaders(eventMessage, serializedObject).forEach((k, v) -> { + if (k.equals(MESSAGE_TIMESTAMP)) { + headers.put(k, formatInstant(eventMessage.getTimestamp())); + } else { + headers.put(k, v); + } + }); + return new GenericMessage<>(serializedObject.getData(), new DefaultSpringMessageEventMessageConverter.SettableTimestampMessageHeaders(headers, eventMessage.getTimestamp().toEpochMilli())); + + } + + @Override + public EventMessage toEventMessage(Message message) { + if (!(message.getPayload() instanceof byte[])) { + throw new IllegalArgumentException("message payload should be byte[]"); + } + MessageHeaders headers = message.getHeaders(); + if (!headers.keySet().containsAll(Arrays.asList(Headers.MESSAGE_ID, Headers.MESSAGE_TYPE))) { + throw new IllegalArgumentException("axon message id or axon message type doesn't exist."); + } + byte[] payload = (byte[]) message.getPayload(); + Map metaData = new HashMap<>(); + headers.forEach((k, v) -> { + if (k.startsWith(Headers.MESSAGE_METADATA + "-")) { + metaData.put(k.substring((Headers.MESSAGE_METADATA + "-").length()), v); + } + }); + SimpleSerializedObject serializedMessage = new SimpleSerializedObject<>(payload, byte[].class, + Objects.toString(headers.get(Headers.MESSAGE_TYPE)), + Objects.toString(headers.get(Headers.MESSAGE_REVISION), null)); + SerializedMessage delegateMessage = new SerializedMessage<>(Objects.toString(headers.get(Headers.MESSAGE_ID)), + new LazyDeserializingObject<>(serializedMessage, serializer), + new LazyDeserializingObject<>(MetaData.from(metaData))); + String timestamp = Objects.toString(headers.get(MESSAGE_TIMESTAMP)); + if (headers.containsKey(Headers.AGGREGATE_ID)) { + return new GenericDomainEventMessage<>(Objects.toString(headers.get(Headers.AGGREGATE_TYPE)), + Objects.toString(headers.get(Headers.AGGREGATE_ID)), + (Long) headers.get(Headers.AGGREGATE_SEQ), + delegateMessage, () -> DateTimeUtils.parseInstant(timestamp)); + } else { + return new GenericEventMessage<>(delegateMessage, () -> DateTimeUtils.parseInstant(timestamp)); + } + } + + private static class SettableTimestampMessageHeaders extends MessageHeaders { + protected SettableTimestampMessageHeaders(Map headers, Long timestamp) { + super(headers, null, timestamp); + } + } +} diff --git a/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageEventMessageConverter.java b/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageEventMessageConverter.java new file mode 100644 index 0000000..1545597 --- /dev/null +++ b/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageEventMessageConverter.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2010-2014. Axon Framework + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.axonframework.extensions.springcloud.eventhandling.converter; + +import org.axonframework.eventhandling.EventMessage; +import org.springframework.messaging.Message; + +/** + * Interface describing a mechanism that converts Spring Messages from an Axon Messages and vice versa. + * + * @author Mehdi Chitforoosh + * @since 4.1 + */ +public interface SpringMessageEventMessageConverter { + + /** + * Creates an Spring Message from given {@code eventMessage}. + * + * @param eventMessage The EventMessage to create the Spring Message from + * @return an Spring Message containing the payload and headers + * Broker. + */ + Message toSpringMessage(EventMessage eventMessage); + + /** + * Reconstruct an EventMessage from the given spring message. The returned value + * resolves to a message. + * + * @param message spring message + * @return The Event Message to publish on the local event processors + */ + EventMessage toEventMessage(Message message); +} From cf4138bc1094ca41a74438b35291aa3f3cd47a8d Mon Sep 17 00:00:00 2001 From: Mehdi Chitforoosh Date: Fri, 19 Mar 2021 11:29:57 +0330 Subject: [PATCH 3/9] Add message channel adapters for sending events on Spring Cloud Stream --- pom.xml | 9 +- springcloud-streams/pom.xml | 82 +++++++++++++ .../AxonInboundChannelAdapter.java | 82 +++++++++++++ .../AxonOutboundChannelAdapter.java | 69 +++++++++++ .../AxonProcessorMessageHandler.java | 110 ++++++++++++++++++ ...ultSpringMessageEventMessageConverter.java | 97 +++++++++++++++ .../SpringMessageEventMessageConverter.java | 47 ++++++++ 7 files changed, 493 insertions(+), 3 deletions(-) create mode 100644 springcloud-streams/pom.xml create mode 100644 springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonInboundChannelAdapter.java create mode 100644 springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonOutboundChannelAdapter.java create mode 100644 springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonProcessorMessageHandler.java create mode 100644 springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/DefaultSpringMessageEventMessageConverter.java create mode 100644 springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageEventMessageConverter.java diff --git a/pom.xml b/pom.xml index 76e94fd..085b714 100644 --- a/pom.xml +++ b/pom.xml @@ -1,4 +1,4 @@ - + - - + org.axonframework.extensions.springcloud axon-springcloud-parent 4.5-SNAPSHOT springcloud + springcloud-streams springcloud-spring-boot-autoconfigure springcloud-spring-boot-starter @@ -54,6 +55,8 @@ Hoxton.SR10 5.3.4 + 5.3.4 + 5.3.4.RELEASE 2.3.9.RELEASE 1.7.30 diff --git a/springcloud-streams/pom.xml b/springcloud-streams/pom.xml new file mode 100644 index 0000000..efafcde --- /dev/null +++ b/springcloud-streams/pom.xml @@ -0,0 +1,82 @@ + + + + + 4.0.0 + + axon-springcloud-parent + org.axonframework.extensions.springcloud + 4.5-SNAPSHOT + + + axon-springcloud-streams + + Axon Framework Spring Cloud Stream Extension + + Module containing Message Adapter implementations for distributing events over Spring Cloud Stream. + + + jar + + + + + org.axonframework + axon-messaging + ${axon.version} + + + org.springframework + spring-messaging + ${spring-messaging.version} + + + org.springframework.integration + spring-integration-core + ${spring-integration.version} + + + + junit + junit + 4.11 + test + + + + + + + src/main/resources + true + + + + + maven-jar-plugin + + + + org.axonframework.extensions.springcloud + + + + + + + diff --git a/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonInboundChannelAdapter.java b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonInboundChannelAdapter.java new file mode 100644 index 0000000..fe805bf --- /dev/null +++ b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonInboundChannelAdapter.java @@ -0,0 +1,82 @@ +package org.axonframework.extensions.springcloud.eventhandling; + +import org.axonframework.eventhandling.EventMessage; +import org.axonframework.extensions.springcloud.eventhandling.converter.DefaultSpringMessageEventMessageConverter; +import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageEventMessageConverter; +import org.axonframework.messaging.SubscribableMessageSource; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.GenericMessage; + +import java.util.List; +import java.util.function.Predicate; + +/** + * + * @author Mehdi Chitforoosh + * @since 4.1 + */ +public class AxonInboundChannelAdapter extends MessageProducerSupport { + + private final Predicate> filter; + private final SubscribableMessageSource> messageSource; + private final SpringMessageEventMessageConverter converter; + + /** + * Initialize an adapter to forward messages from the given {@code messageSource} to the given {@code channel}. + * Messages are not filtered; all messages are forwarded to the MessageChannel + * + * @param messageSource The event bus to subscribe to. + */ + public AxonInboundChannelAdapter(SubscribableMessageSource> messageSource) { + this(messageSource, m -> true, new DefaultSpringMessageEventMessageConverter()); + } + + /** + * Initialize an adapter to forward messages from the given {@code messageSource} to the given {@code channel}. + * Messages are filtered using the given {@code filter}. + * + * @param messageSource The inbound of messages to subscribe to. + * @param filter The filter that indicates which messages to forward. + */ + public AxonInboundChannelAdapter(SubscribableMessageSource> messageSource, Predicate> filter) { + this(messageSource, filter, new DefaultSpringMessageEventMessageConverter()); + } + + /** + * Initialize an adapter to forward messages from the given {@code messageSource} to the given {@code channel}. + * Messages are filtered using the given {@code filter}. + * + * @param messageSource The inbound of messages to subscribe to. + * @param filter The filter that indicates which messages to forward. + * @param converter The converter to use to convert event message into Spring message + */ + public AxonInboundChannelAdapter(SubscribableMessageSource> messageSource, Predicate> filter, SpringMessageEventMessageConverter converter) { + this.messageSource = messageSource; + this.filter = filter; + this.converter = converter; + } + + /** + * Subscribes this event listener to the event bus. + */ + @Override + protected void onInit() { + super.onInit(); + this.messageSource.subscribe(this::handle); + } + + + /** + * If allows by the filter, wraps the given {@code event} in a {@link GenericMessage} ands sends it to the + * configured {@link MessageChannel}. + * + * @param events the events to handle + */ + protected void handle(List> events) { + events.stream() + .filter(this.filter) + .forEach(event -> this.sendMessage(this.converter.toSpringMessage(event))); + } + +} diff --git a/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonOutboundChannelAdapter.java b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonOutboundChannelAdapter.java new file mode 100644 index 0000000..253f03a --- /dev/null +++ b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonOutboundChannelAdapter.java @@ -0,0 +1,69 @@ +package org.axonframework.extensions.springcloud.eventhandling; + +import org.axonframework.common.Registration; +import org.axonframework.eventhandling.EventBus; +import org.axonframework.eventhandling.EventMessage; +import org.axonframework.extensions.springcloud.eventhandling.converter.DefaultSpringMessageEventMessageConverter; +import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageEventMessageConverter; +import org.axonframework.messaging.SubscribableMessageSource; +import org.springframework.integration.handler.AbstractMessageProducingHandler; +import org.springframework.messaging.Message; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; + +import static java.util.Collections.singletonList; + +/** + * @author Mehdi Chitforoosh + * @since 4.1 + */ +public class AxonOutboundChannelAdapter extends AbstractMessageProducingHandler implements SubscribableMessageSource> { + + private final CopyOnWriteArrayList>>> messageProcessors = new CopyOnWriteArrayList<>(); + private final SpringMessageEventMessageConverter eventMessageConverter; + + /** + * Initialize an AxonOutboundChannelAdapter instance that sends all incoming Event Messages to the given + * {@code eventBus}. It is still possible for other Event Processors to subscribe to this MessageChannelAdapter. + * + * @param eventBus The EventBus instance for forward all messages to + */ + public AxonOutboundChannelAdapter(EventBus eventBus) { + this(singletonList(eventBus::publish), new DefaultSpringMessageEventMessageConverter()); + } + + /** + * Initialize the adapter to publish all incoming events to the subscribed processors. Note that this instance should + * be registered as a consumer of a Spring Message Channel. + * + * @param processors Processors to be subscribed + * @param eventMessageConverter The message converter to use to convert spring message into event message + */ + public AxonOutboundChannelAdapter(List>>> processors, SpringMessageEventMessageConverter eventMessageConverter) { + this.messageProcessors.addAll(processors); + this.eventMessageConverter = eventMessageConverter; + } + + @Override + public Registration subscribe(Consumer>> messageProcessor) { + messageProcessors.add(messageProcessor); + return () -> messageProcessors.remove(messageProcessor); + } + + /** + * Handles the given {@code message}. If the filter refuses the message, it is ignored. + * + * @param message The message containing the event to publish + */ + @Override + protected void handleMessageInternal(Message message) { + EventMessage eventMessage = eventMessageConverter.toEventMessage(message); + List> messages = singletonList(eventMessage); + for (Consumer>> messageProcessor : messageProcessors) { + messageProcessor.accept(messages); + } + } + +} diff --git a/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonProcessorMessageHandler.java b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonProcessorMessageHandler.java new file mode 100644 index 0000000..216213f --- /dev/null +++ b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonProcessorMessageHandler.java @@ -0,0 +1,110 @@ +package org.axonframework.extensions.springcloud.eventhandling; + +import org.axonframework.common.Registration; +import org.axonframework.eventhandling.EventBus; +import org.axonframework.eventhandling.EventMessage; +import org.axonframework.extensions.springcloud.eventhandling.converter.DefaultSpringMessageEventMessageConverter; +import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageEventMessageConverter; +import org.axonframework.messaging.SubscribableMessageSource; +import org.springframework.integration.handler.AbstractMessageProducingHandler; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.GenericMessage; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; +import java.util.function.Predicate; + +import static java.util.Collections.singletonList; + +/** + * @author Mehdi chitforoosh + * @since 4.1 + */ +public class AxonProcessorMessageHandler extends AbstractMessageProducingHandler implements SubscribableMessageSource> { + + private final SubscribableMessageSource> messageSource; + private final CopyOnWriteArrayList>>> messageProcessors = new CopyOnWriteArrayList<>(); + private final Predicate> filter; + private final SpringMessageEventMessageConverter eventMessageConverter; + + /** + * Initialize the adapter to publish all incoming events to the subscribed processors. Note that this instance should + * be registered as a consumer of a Spring Message Channel. + */ + public AxonProcessorMessageHandler(EventBus eventBus) { + this(eventBus, singletonList(eventBus::publish), m -> true, new DefaultSpringMessageEventMessageConverter()); + } + + /** + * Initialize an AxonProcessorMessageHandler instance that sends all incoming Event Messages to the given + * {@code eventBus}. It is still possible for other Event Processors to subscribe to this MessageChannelAdapter. + * + * @param eventBus The EventBus instance for forward all messages to + * @param filter The filter that indicates which messages to forward. + */ + public AxonProcessorMessageHandler(EventBus eventBus, Predicate> filter) { + this(eventBus, singletonList(eventBus::publish), filter, new DefaultSpringMessageEventMessageConverter()); + } + + /** + * Initialize the adapter to publish all incoming events to the subscribed processors. Note that this instance should + * be registered as a consumer of a Spring Message Channel. + * + * @param messageSource The inbound of messages to subscribe to. + * @param processors Processors to be subscribed + * @param filter The filter that indicates which messages to forward. + * @param eventMessageConverter The message converter to use to convert spring message into event message + */ + public AxonProcessorMessageHandler(SubscribableMessageSource> messageSource, List>>> processors + , Predicate> filter, SpringMessageEventMessageConverter eventMessageConverter) { + this.messageSource = messageSource; + this.messageProcessors.addAll(processors); + this.filter = filter; + this.eventMessageConverter = eventMessageConverter; + } + + @Override + public Registration subscribe(Consumer>> messageProcessor) { + messageProcessors.add(messageProcessor); + return () -> messageProcessors.remove(messageProcessor); + } + + /** + * Subscribes this event listener to the event bus. + */ + @Override + protected void onInit() { + super.onInit(); + messageSource.subscribe(this::handle); + } + + + /** + * If allows by the filter, wraps the given {@code event} in a {@link GenericMessage} ands sends it to the + * configured {@link MessageChannel}. + * + * @param events the events to handle + */ + protected void handle(List> events) { + events.stream() + .filter(filter) + .forEach(event -> this.sendOutput(eventMessageConverter.toSpringMessage(event), null, false)); + } + + /** + * Handles the given {@code message}. If the filter refuses the message, it is ignored. + * + * @param message The message containing the event to publish + */ + @Override + protected void handleMessageInternal(Message message) { + EventMessage eventMessage = eventMessageConverter.toEventMessage(message); + List> messages = singletonList(eventMessage); + for (Consumer>> messageProcessor : messageProcessors) { + messageProcessor.accept(messages); + } + } + +} diff --git a/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/DefaultSpringMessageEventMessageConverter.java b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/DefaultSpringMessageEventMessageConverter.java new file mode 100644 index 0000000..710099c --- /dev/null +++ b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/DefaultSpringMessageEventMessageConverter.java @@ -0,0 +1,97 @@ +package org.axonframework.extensions.springcloud.eventhandling.converter; + +import org.axonframework.common.Assert; +import org.axonframework.common.DateTimeUtils; +import org.axonframework.eventhandling.EventMessage; +import org.axonframework.eventhandling.GenericDomainEventMessage; +import org.axonframework.eventhandling.GenericEventMessage; +import org.axonframework.messaging.Headers; +import org.axonframework.messaging.MetaData; +import org.axonframework.serialization.*; +import org.axonframework.serialization.json.JacksonSerializer; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.GenericMessage; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.axonframework.common.DateTimeUtils.formatInstant; +import static org.axonframework.messaging.Headers.MESSAGE_TIMESTAMP; + +/** + * Default byte[] spring message to event message converter based on Jackson serializer. + * + * @Mehdi Chitforoosh + * @since 4.1 + */ +public class DefaultSpringMessageEventMessageConverter implements SpringMessageEventMessageConverter { + + private final Serializer serializer; + + public DefaultSpringMessageEventMessageConverter() { + this.serializer = JacksonSerializer.builder().build(); + } + + public DefaultSpringMessageEventMessageConverter(Serializer serializer) { + Assert.notNull(serializer, () -> "Serializer may not be null"); + this.serializer = serializer; + } + + @Override + public Message toSpringMessage(EventMessage eventMessage) { + SerializedObject serializedObject = eventMessage.serializePayload(serializer, byte[].class); + Map headers = new HashMap<>(); + eventMessage.getMetaData().forEach((k, v) -> headers.put(Headers.MESSAGE_METADATA + "-" + k, v)); + Headers.defaultHeaders(eventMessage, serializedObject).forEach((k, v) -> { + if (k.equals(MESSAGE_TIMESTAMP)) { + headers.put(k, formatInstant(eventMessage.getTimestamp())); + } else { + headers.put(k, v); + } + }); + return new GenericMessage<>(serializedObject.getData(), new DefaultSpringMessageEventMessageConverter.SettableTimestampMessageHeaders(headers, eventMessage.getTimestamp().toEpochMilli())); + + } + + @Override + public EventMessage toEventMessage(Message message) { + if (!(message.getPayload() instanceof byte[])) { + throw new IllegalArgumentException("message payload should be byte[]"); + } + MessageHeaders headers = message.getHeaders(); + if (!headers.keySet().containsAll(Arrays.asList(Headers.MESSAGE_ID, Headers.MESSAGE_TYPE))) { + throw new IllegalArgumentException("axon message id or axon message type doesn't exist."); + } + byte[] payload = (byte[]) message.getPayload(); + Map metaData = new HashMap<>(); + headers.forEach((k, v) -> { + if (k.startsWith(Headers.MESSAGE_METADATA + "-")) { + metaData.put(k.substring((Headers.MESSAGE_METADATA + "-").length()), v); + } + }); + SimpleSerializedObject serializedMessage = new SimpleSerializedObject<>(payload, byte[].class, + Objects.toString(headers.get(Headers.MESSAGE_TYPE)), + Objects.toString(headers.get(Headers.MESSAGE_REVISION), null)); + SerializedMessage delegateMessage = new SerializedMessage<>(Objects.toString(headers.get(Headers.MESSAGE_ID)), + new LazyDeserializingObject<>(serializedMessage, serializer), + new LazyDeserializingObject<>(MetaData.from(metaData))); + String timestamp = Objects.toString(headers.get(MESSAGE_TIMESTAMP)); + if (headers.containsKey(Headers.AGGREGATE_ID)) { + return new GenericDomainEventMessage<>(Objects.toString(headers.get(Headers.AGGREGATE_TYPE)), + Objects.toString(headers.get(Headers.AGGREGATE_ID)), + (Long) headers.get(Headers.AGGREGATE_SEQ), + delegateMessage, () -> DateTimeUtils.parseInstant(timestamp)); + } else { + return new GenericEventMessage<>(delegateMessage, () -> DateTimeUtils.parseInstant(timestamp)); + } + } + + private static class SettableTimestampMessageHeaders extends MessageHeaders { + protected SettableTimestampMessageHeaders(Map headers, Long timestamp) { + super(headers, null, timestamp); + } + } +} diff --git a/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageEventMessageConverter.java b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageEventMessageConverter.java new file mode 100644 index 0000000..1545597 --- /dev/null +++ b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageEventMessageConverter.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2010-2014. Axon Framework + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.axonframework.extensions.springcloud.eventhandling.converter; + +import org.axonframework.eventhandling.EventMessage; +import org.springframework.messaging.Message; + +/** + * Interface describing a mechanism that converts Spring Messages from an Axon Messages and vice versa. + * + * @author Mehdi Chitforoosh + * @since 4.1 + */ +public interface SpringMessageEventMessageConverter { + + /** + * Creates an Spring Message from given {@code eventMessage}. + * + * @param eventMessage The EventMessage to create the Spring Message from + * @return an Spring Message containing the payload and headers + * Broker. + */ + Message toSpringMessage(EventMessage eventMessage); + + /** + * Reconstruct an EventMessage from the given spring message. The returned value + * resolves to a message. + * + * @param message spring message + * @return The Event Message to publish on the local event processors + */ + EventMessage toEventMessage(Message message); +} From 6431df2e3e4c462d580ca47383cab00511865f5c Mon Sep 17 00:00:00 2001 From: Mehdi Chitforoosh Date: Sat, 3 Apr 2021 19:45:03 +0430 Subject: [PATCH 4/9] Remove event handling related classes from command handling package --- ...ultSpringMessageEventMessageConverter.java | 97 --------------- .../SpringMessageEventMessageConverter.java | 47 -------- .../inbound/AxonInboundChannelAdapter.java | 82 ------------- .../outbound/AxonOutboundChannelAdapter.java | 70 ----------- .../AxonProcessorMessageHandler.java | 110 ------------------ 5 files changed, 406 deletions(-) delete mode 100644 springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/DefaultSpringMessageEventMessageConverter.java delete mode 100644 springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageEventMessageConverter.java delete mode 100644 springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/inbound/AxonInboundChannelAdapter.java delete mode 100644 springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/outbound/AxonOutboundChannelAdapter.java delete mode 100644 springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/processor/AxonProcessorMessageHandler.java diff --git a/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/DefaultSpringMessageEventMessageConverter.java b/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/DefaultSpringMessageEventMessageConverter.java deleted file mode 100644 index 710099c..0000000 --- a/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/DefaultSpringMessageEventMessageConverter.java +++ /dev/null @@ -1,97 +0,0 @@ -package org.axonframework.extensions.springcloud.eventhandling.converter; - -import org.axonframework.common.Assert; -import org.axonframework.common.DateTimeUtils; -import org.axonframework.eventhandling.EventMessage; -import org.axonframework.eventhandling.GenericDomainEventMessage; -import org.axonframework.eventhandling.GenericEventMessage; -import org.axonframework.messaging.Headers; -import org.axonframework.messaging.MetaData; -import org.axonframework.serialization.*; -import org.axonframework.serialization.json.JacksonSerializer; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.support.GenericMessage; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - -import static org.axonframework.common.DateTimeUtils.formatInstant; -import static org.axonframework.messaging.Headers.MESSAGE_TIMESTAMP; - -/** - * Default byte[] spring message to event message converter based on Jackson serializer. - * - * @Mehdi Chitforoosh - * @since 4.1 - */ -public class DefaultSpringMessageEventMessageConverter implements SpringMessageEventMessageConverter { - - private final Serializer serializer; - - public DefaultSpringMessageEventMessageConverter() { - this.serializer = JacksonSerializer.builder().build(); - } - - public DefaultSpringMessageEventMessageConverter(Serializer serializer) { - Assert.notNull(serializer, () -> "Serializer may not be null"); - this.serializer = serializer; - } - - @Override - public Message toSpringMessage(EventMessage eventMessage) { - SerializedObject serializedObject = eventMessage.serializePayload(serializer, byte[].class); - Map headers = new HashMap<>(); - eventMessage.getMetaData().forEach((k, v) -> headers.put(Headers.MESSAGE_METADATA + "-" + k, v)); - Headers.defaultHeaders(eventMessage, serializedObject).forEach((k, v) -> { - if (k.equals(MESSAGE_TIMESTAMP)) { - headers.put(k, formatInstant(eventMessage.getTimestamp())); - } else { - headers.put(k, v); - } - }); - return new GenericMessage<>(serializedObject.getData(), new DefaultSpringMessageEventMessageConverter.SettableTimestampMessageHeaders(headers, eventMessage.getTimestamp().toEpochMilli())); - - } - - @Override - public EventMessage toEventMessage(Message message) { - if (!(message.getPayload() instanceof byte[])) { - throw new IllegalArgumentException("message payload should be byte[]"); - } - MessageHeaders headers = message.getHeaders(); - if (!headers.keySet().containsAll(Arrays.asList(Headers.MESSAGE_ID, Headers.MESSAGE_TYPE))) { - throw new IllegalArgumentException("axon message id or axon message type doesn't exist."); - } - byte[] payload = (byte[]) message.getPayload(); - Map metaData = new HashMap<>(); - headers.forEach((k, v) -> { - if (k.startsWith(Headers.MESSAGE_METADATA + "-")) { - metaData.put(k.substring((Headers.MESSAGE_METADATA + "-").length()), v); - } - }); - SimpleSerializedObject serializedMessage = new SimpleSerializedObject<>(payload, byte[].class, - Objects.toString(headers.get(Headers.MESSAGE_TYPE)), - Objects.toString(headers.get(Headers.MESSAGE_REVISION), null)); - SerializedMessage delegateMessage = new SerializedMessage<>(Objects.toString(headers.get(Headers.MESSAGE_ID)), - new LazyDeserializingObject<>(serializedMessage, serializer), - new LazyDeserializingObject<>(MetaData.from(metaData))); - String timestamp = Objects.toString(headers.get(MESSAGE_TIMESTAMP)); - if (headers.containsKey(Headers.AGGREGATE_ID)) { - return new GenericDomainEventMessage<>(Objects.toString(headers.get(Headers.AGGREGATE_TYPE)), - Objects.toString(headers.get(Headers.AGGREGATE_ID)), - (Long) headers.get(Headers.AGGREGATE_SEQ), - delegateMessage, () -> DateTimeUtils.parseInstant(timestamp)); - } else { - return new GenericEventMessage<>(delegateMessage, () -> DateTimeUtils.parseInstant(timestamp)); - } - } - - private static class SettableTimestampMessageHeaders extends MessageHeaders { - protected SettableTimestampMessageHeaders(Map headers, Long timestamp) { - super(headers, null, timestamp); - } - } -} diff --git a/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageEventMessageConverter.java b/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageEventMessageConverter.java deleted file mode 100644 index 1545597..0000000 --- a/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageEventMessageConverter.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2010-2014. Axon Framework - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.axonframework.extensions.springcloud.eventhandling.converter; - -import org.axonframework.eventhandling.EventMessage; -import org.springframework.messaging.Message; - -/** - * Interface describing a mechanism that converts Spring Messages from an Axon Messages and vice versa. - * - * @author Mehdi Chitforoosh - * @since 4.1 - */ -public interface SpringMessageEventMessageConverter { - - /** - * Creates an Spring Message from given {@code eventMessage}. - * - * @param eventMessage The EventMessage to create the Spring Message from - * @return an Spring Message containing the payload and headers - * Broker. - */ - Message toSpringMessage(EventMessage eventMessage); - - /** - * Reconstruct an EventMessage from the given spring message. The returned value - * resolves to a message. - * - * @param message spring message - * @return The Event Message to publish on the local event processors - */ - EventMessage toEventMessage(Message message); -} diff --git a/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/inbound/AxonInboundChannelAdapter.java b/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/inbound/AxonInboundChannelAdapter.java deleted file mode 100644 index 986b816..0000000 --- a/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/inbound/AxonInboundChannelAdapter.java +++ /dev/null @@ -1,82 +0,0 @@ -package org.axonframework.extensions.springcloud.eventhandling.inbound; - -import org.axonframework.eventhandling.EventMessage; -import org.axonframework.extensions.springcloud.eventhandling.converter.DefaultSpringMessageEventMessageConverter; -import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageEventMessageConverter; -import org.axonframework.messaging.SubscribableMessageSource; -import org.springframework.integration.endpoint.MessageProducerSupport; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.support.GenericMessage; - -import java.util.List; -import java.util.function.Predicate; - -/** - * - * @author Mehdi Chitforoosh - * @since 4.1 - */ -public class AxonInboundChannelAdapter extends MessageProducerSupport { - - private final Predicate> filter; - private final SubscribableMessageSource> messageSource; - private final SpringMessageEventMessageConverter converter; - - /** - * Initialize an adapter to forward messages from the given {@code messageSource} to the given {@code channel}. - * Messages are not filtered; all messages are forwarded to the MessageChannel - * - * @param messageSource The event bus to subscribe to. - */ - public AxonInboundChannelAdapter(SubscribableMessageSource> messageSource) { - this(messageSource, m -> true, new DefaultSpringMessageEventMessageConverter()); - } - - /** - * Initialize an adapter to forward messages from the given {@code messageSource} to the given {@code channel}. - * Messages are filtered using the given {@code filter}. - * - * @param messageSource The inbound of messages to subscribe to. - * @param filter The filter that indicates which messages to forward. - */ - public AxonInboundChannelAdapter(SubscribableMessageSource> messageSource, Predicate> filter) { - this(messageSource, filter, new DefaultSpringMessageEventMessageConverter()); - } - - /** - * Initialize an adapter to forward messages from the given {@code messageSource} to the given {@code channel}. - * Messages are filtered using the given {@code filter}. - * - * @param messageSource The inbound of messages to subscribe to. - * @param filter The filter that indicates which messages to forward. - * @param converter The converter to use to convert event message into Spring message - */ - public AxonInboundChannelAdapter(SubscribableMessageSource> messageSource, Predicate> filter, SpringMessageEventMessageConverter converter) { - this.messageSource = messageSource; - this.filter = filter; - this.converter = converter; - } - - /** - * Subscribes this event listener to the event bus. - */ - @Override - protected void onInit() { - super.onInit(); - this.messageSource.subscribe(this::handle); - } - - - /** - * If allows by the filter, wraps the given {@code event} in a {@link GenericMessage} ands sends it to the - * configured {@link MessageChannel}. - * - * @param events the events to handle - */ - protected void handle(List> events) { - events.stream() - .filter(this.filter) - .forEach(event -> this.sendMessage(this.converter.toSpringMessage(event))); - } - -} diff --git a/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/outbound/AxonOutboundChannelAdapter.java b/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/outbound/AxonOutboundChannelAdapter.java deleted file mode 100644 index 8d49222..0000000 --- a/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/outbound/AxonOutboundChannelAdapter.java +++ /dev/null @@ -1,70 +0,0 @@ -package org.axonframework.extensions.springcloud.eventhandling.outbound; - -import org.axonframework.common.Registration; -import org.axonframework.eventhandling.EventBus; -import org.axonframework.eventhandling.EventMessage; -import org.axonframework.extensions.springcloud.eventhandling.converter.DefaultSpringMessageEventMessageConverter; -import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageEventMessageConverter; -import org.axonframework.messaging.SubscribableMessageSource; -import org.springframework.integration.handler.AbstractMessageProducingHandler; -import org.springframework.messaging.Message; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.function.Consumer; - -import static java.util.Collections.singletonList; - -/** - * - * @author Mehdi Chitforoosh - * @since 4.1 - */ -public class AxonOutboundChannelAdapter extends AbstractMessageProducingHandler implements SubscribableMessageSource> { - - private final CopyOnWriteArrayList>>> messageProcessors = new CopyOnWriteArrayList<>(); - private final SpringMessageEventMessageConverter eventMessageConverter; - - /** - * Initialize an AxonOutboundChannelAdapter instance that sends all incoming Event Messages to the given - * {@code eventBus}. It is still possible for other Event Processors to subscribe to this MessageChannelAdapter. - * - * @param eventBus The EventBus instance for forward all messages to - */ - public AxonOutboundChannelAdapter(EventBus eventBus) { - this(singletonList(eventBus::publish), new DefaultSpringMessageEventMessageConverter()); - } - - /** - * Initialize the adapter to publish all incoming events to the subscribed processors. Note that this instance should - * be registered as a consumer of a Spring Message Channel. - * - * @param processors Processors to be subscribed - * @param eventMessageConverter The message converter to use to convert spring message into event message - */ - public AxonOutboundChannelAdapter(List>>> processors, SpringMessageEventMessageConverter eventMessageConverter) { - this.messageProcessors.addAll(processors); - this.eventMessageConverter = eventMessageConverter; - } - - @Override - public Registration subscribe(Consumer>> messageProcessor) { - messageProcessors.add(messageProcessor); - return () -> messageProcessors.remove(messageProcessor); - } - - /** - * Handles the given {@code message}. If the filter refuses the message, it is ignored. - * - * @param message The message containing the event to publish - */ - @Override - protected void handleMessageInternal(Message message) throws Exception { - EventMessage eventMessage = eventMessageConverter.toEventMessage(message); - List> messages = singletonList(eventMessage); - for (Consumer>> messageProcessor : messageProcessors) { - messageProcessor.accept(messages); - } - } - -} diff --git a/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/processor/AxonProcessorMessageHandler.java b/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/processor/AxonProcessorMessageHandler.java deleted file mode 100644 index a93461a..0000000 --- a/springcloud/src/main/java/org/axonframework/extensions/springcloud/eventhandling/processor/AxonProcessorMessageHandler.java +++ /dev/null @@ -1,110 +0,0 @@ -package org.axonframework.extensions.springcloud.eventhandling.processor; - -import org.axonframework.common.Registration; -import org.axonframework.eventhandling.EventBus; -import org.axonframework.eventhandling.EventMessage; -import org.axonframework.extensions.springcloud.eventhandling.converter.DefaultSpringMessageEventMessageConverter; -import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageEventMessageConverter; -import org.axonframework.messaging.SubscribableMessageSource; -import org.springframework.integration.handler.AbstractMessageProducingHandler; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.support.GenericMessage; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.function.Consumer; -import java.util.function.Predicate; - -import static java.util.Collections.singletonList; - -/** - * @author Mehdi chitforoosh - * @since 4.1 - */ -public class AxonProcessorMessageHandler extends AbstractMessageProducingHandler implements SubscribableMessageSource> { - - private final SubscribableMessageSource> messageSource; - private final CopyOnWriteArrayList>>> messageProcessors = new CopyOnWriteArrayList<>(); - private final Predicate> filter; - private final SpringMessageEventMessageConverter eventMessageConverter; - - /** - * Initialize the adapter to publish all incoming events to the subscribed processors. Note that this instance should - * be registered as a consumer of a Spring Message Channel. - */ - public AxonProcessorMessageHandler(EventBus eventBus) { - this(eventBus, singletonList(eventBus::publish), m -> true, new DefaultSpringMessageEventMessageConverter()); - } - - /** - * Initialize an AxonProcessorMessageHandler instance that sends all incoming Event Messages to the given - * {@code eventBus}. It is still possible for other Event Processors to subscribe to this MessageChannelAdapter. - * - * @param eventBus The EventBus instance for forward all messages to - * @param filter The filter that indicates which messages to forward. - */ - public AxonProcessorMessageHandler(EventBus eventBus, Predicate> filter) { - this(eventBus, singletonList(eventBus::publish), filter, new DefaultSpringMessageEventMessageConverter()); - } - - /** - * Initialize the adapter to publish all incoming events to the subscribed processors. Note that this instance should - * be registered as a consumer of a Spring Message Channel. - * - * @param messageSource The inbound of messages to subscribe to. - * @param processors Processors to be subscribed - * @param filter The filter that indicates which messages to forward. - * @param eventMessageConverter The message converter to use to convert spring message into event message - */ - public AxonProcessorMessageHandler(SubscribableMessageSource> messageSource, List>>> processors - , Predicate> filter, SpringMessageEventMessageConverter eventMessageConverter) { - this.messageSource = messageSource; - this.messageProcessors.addAll(processors); - this.filter = filter; - this.eventMessageConverter = eventMessageConverter; - } - - @Override - public Registration subscribe(Consumer>> messageProcessor) { - messageProcessors.add(messageProcessor); - return () -> messageProcessors.remove(messageProcessor); - } - - /** - * Subscribes this event listener to the event bus. - */ - @Override - protected void onInit() { - super.onInit(); - messageSource.subscribe(this::handle); - } - - - /** - * If allows by the filter, wraps the given {@code event} in a {@link GenericMessage} ands sends it to the - * configured {@link MessageChannel}. - * - * @param events the events to handle - */ - protected void handle(List> events) { - events.stream() - .filter(filter) - .forEach(event -> this.sendOutput(eventMessageConverter.toSpringMessage(event), null, false)); - } - - /** - * Handles the given {@code message}. If the filter refuses the message, it is ignored. - * - * @param message The message containing the event to publish - */ - @Override - protected void handleMessageInternal(Message message) throws Exception { - EventMessage eventMessage = eventMessageConverter.toEventMessage(message); - List> messages = singletonList(eventMessage); - for (Consumer>> messageProcessor : messageProcessors) { - messageProcessor.accept(messages); - } - } - -} From bd22770c0f5f40ee7b4845abef222d9d21920716 Mon Sep 17 00:00:00 2001 From: Mehdi Chitforoosh Date: Sun, 18 Apr 2021 18:09:46 +0430 Subject: [PATCH 5/9] Rename and refactor classes Add Javadocs,copyright Add builder for creating objects --- .../AxonInboundChannelAdapter.java | 82 ------- .../AxonOutboundChannelAdapter.java | 69 ------ .../AxonProcessorMessageHandler.java | 110 ---------- .../SpringStreamMessageProcessor.java | 207 ++++++++++++++++++ .../SpringStreamMessagePublisher.java | 162 ++++++++++++++ .../SpringStreamMessageSource.java | 150 +++++++++++++ .../DefaultSpringMessageConverter.java | 170 ++++++++++++++ ...ultSpringMessageEventMessageConverter.java | 97 -------- ...erter.java => SpringMessageConverter.java} | 12 +- 9 files changed, 696 insertions(+), 363 deletions(-) delete mode 100644 springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonInboundChannelAdapter.java delete mode 100644 springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonOutboundChannelAdapter.java delete mode 100644 springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonProcessorMessageHandler.java create mode 100644 springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageProcessor.java create mode 100644 springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessagePublisher.java create mode 100644 springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageSource.java create mode 100644 springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/DefaultSpringMessageConverter.java delete mode 100644 springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/DefaultSpringMessageEventMessageConverter.java rename springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/{SpringMessageEventMessageConverter.java => SpringMessageConverter.java} (84%) diff --git a/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonInboundChannelAdapter.java b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonInboundChannelAdapter.java deleted file mode 100644 index fe805bf..0000000 --- a/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonInboundChannelAdapter.java +++ /dev/null @@ -1,82 +0,0 @@ -package org.axonframework.extensions.springcloud.eventhandling; - -import org.axonframework.eventhandling.EventMessage; -import org.axonframework.extensions.springcloud.eventhandling.converter.DefaultSpringMessageEventMessageConverter; -import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageEventMessageConverter; -import org.axonframework.messaging.SubscribableMessageSource; -import org.springframework.integration.endpoint.MessageProducerSupport; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.support.GenericMessage; - -import java.util.List; -import java.util.function.Predicate; - -/** - * - * @author Mehdi Chitforoosh - * @since 4.1 - */ -public class AxonInboundChannelAdapter extends MessageProducerSupport { - - private final Predicate> filter; - private final SubscribableMessageSource> messageSource; - private final SpringMessageEventMessageConverter converter; - - /** - * Initialize an adapter to forward messages from the given {@code messageSource} to the given {@code channel}. - * Messages are not filtered; all messages are forwarded to the MessageChannel - * - * @param messageSource The event bus to subscribe to. - */ - public AxonInboundChannelAdapter(SubscribableMessageSource> messageSource) { - this(messageSource, m -> true, new DefaultSpringMessageEventMessageConverter()); - } - - /** - * Initialize an adapter to forward messages from the given {@code messageSource} to the given {@code channel}. - * Messages are filtered using the given {@code filter}. - * - * @param messageSource The inbound of messages to subscribe to. - * @param filter The filter that indicates which messages to forward. - */ - public AxonInboundChannelAdapter(SubscribableMessageSource> messageSource, Predicate> filter) { - this(messageSource, filter, new DefaultSpringMessageEventMessageConverter()); - } - - /** - * Initialize an adapter to forward messages from the given {@code messageSource} to the given {@code channel}. - * Messages are filtered using the given {@code filter}. - * - * @param messageSource The inbound of messages to subscribe to. - * @param filter The filter that indicates which messages to forward. - * @param converter The converter to use to convert event message into Spring message - */ - public AxonInboundChannelAdapter(SubscribableMessageSource> messageSource, Predicate> filter, SpringMessageEventMessageConverter converter) { - this.messageSource = messageSource; - this.filter = filter; - this.converter = converter; - } - - /** - * Subscribes this event listener to the event bus. - */ - @Override - protected void onInit() { - super.onInit(); - this.messageSource.subscribe(this::handle); - } - - - /** - * If allows by the filter, wraps the given {@code event} in a {@link GenericMessage} ands sends it to the - * configured {@link MessageChannel}. - * - * @param events the events to handle - */ - protected void handle(List> events) { - events.stream() - .filter(this.filter) - .forEach(event -> this.sendMessage(this.converter.toSpringMessage(event))); - } - -} diff --git a/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonOutboundChannelAdapter.java b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonOutboundChannelAdapter.java deleted file mode 100644 index 253f03a..0000000 --- a/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonOutboundChannelAdapter.java +++ /dev/null @@ -1,69 +0,0 @@ -package org.axonframework.extensions.springcloud.eventhandling; - -import org.axonframework.common.Registration; -import org.axonframework.eventhandling.EventBus; -import org.axonframework.eventhandling.EventMessage; -import org.axonframework.extensions.springcloud.eventhandling.converter.DefaultSpringMessageEventMessageConverter; -import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageEventMessageConverter; -import org.axonframework.messaging.SubscribableMessageSource; -import org.springframework.integration.handler.AbstractMessageProducingHandler; -import org.springframework.messaging.Message; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.function.Consumer; - -import static java.util.Collections.singletonList; - -/** - * @author Mehdi Chitforoosh - * @since 4.1 - */ -public class AxonOutboundChannelAdapter extends AbstractMessageProducingHandler implements SubscribableMessageSource> { - - private final CopyOnWriteArrayList>>> messageProcessors = new CopyOnWriteArrayList<>(); - private final SpringMessageEventMessageConverter eventMessageConverter; - - /** - * Initialize an AxonOutboundChannelAdapter instance that sends all incoming Event Messages to the given - * {@code eventBus}. It is still possible for other Event Processors to subscribe to this MessageChannelAdapter. - * - * @param eventBus The EventBus instance for forward all messages to - */ - public AxonOutboundChannelAdapter(EventBus eventBus) { - this(singletonList(eventBus::publish), new DefaultSpringMessageEventMessageConverter()); - } - - /** - * Initialize the adapter to publish all incoming events to the subscribed processors. Note that this instance should - * be registered as a consumer of a Spring Message Channel. - * - * @param processors Processors to be subscribed - * @param eventMessageConverter The message converter to use to convert spring message into event message - */ - public AxonOutboundChannelAdapter(List>>> processors, SpringMessageEventMessageConverter eventMessageConverter) { - this.messageProcessors.addAll(processors); - this.eventMessageConverter = eventMessageConverter; - } - - @Override - public Registration subscribe(Consumer>> messageProcessor) { - messageProcessors.add(messageProcessor); - return () -> messageProcessors.remove(messageProcessor); - } - - /** - * Handles the given {@code message}. If the filter refuses the message, it is ignored. - * - * @param message The message containing the event to publish - */ - @Override - protected void handleMessageInternal(Message message) { - EventMessage eventMessage = eventMessageConverter.toEventMessage(message); - List> messages = singletonList(eventMessage); - for (Consumer>> messageProcessor : messageProcessors) { - messageProcessor.accept(messages); - } - } - -} diff --git a/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonProcessorMessageHandler.java b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonProcessorMessageHandler.java deleted file mode 100644 index 216213f..0000000 --- a/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/AxonProcessorMessageHandler.java +++ /dev/null @@ -1,110 +0,0 @@ -package org.axonframework.extensions.springcloud.eventhandling; - -import org.axonframework.common.Registration; -import org.axonframework.eventhandling.EventBus; -import org.axonframework.eventhandling.EventMessage; -import org.axonframework.extensions.springcloud.eventhandling.converter.DefaultSpringMessageEventMessageConverter; -import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageEventMessageConverter; -import org.axonframework.messaging.SubscribableMessageSource; -import org.springframework.integration.handler.AbstractMessageProducingHandler; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.support.GenericMessage; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.function.Consumer; -import java.util.function.Predicate; - -import static java.util.Collections.singletonList; - -/** - * @author Mehdi chitforoosh - * @since 4.1 - */ -public class AxonProcessorMessageHandler extends AbstractMessageProducingHandler implements SubscribableMessageSource> { - - private final SubscribableMessageSource> messageSource; - private final CopyOnWriteArrayList>>> messageProcessors = new CopyOnWriteArrayList<>(); - private final Predicate> filter; - private final SpringMessageEventMessageConverter eventMessageConverter; - - /** - * Initialize the adapter to publish all incoming events to the subscribed processors. Note that this instance should - * be registered as a consumer of a Spring Message Channel. - */ - public AxonProcessorMessageHandler(EventBus eventBus) { - this(eventBus, singletonList(eventBus::publish), m -> true, new DefaultSpringMessageEventMessageConverter()); - } - - /** - * Initialize an AxonProcessorMessageHandler instance that sends all incoming Event Messages to the given - * {@code eventBus}. It is still possible for other Event Processors to subscribe to this MessageChannelAdapter. - * - * @param eventBus The EventBus instance for forward all messages to - * @param filter The filter that indicates which messages to forward. - */ - public AxonProcessorMessageHandler(EventBus eventBus, Predicate> filter) { - this(eventBus, singletonList(eventBus::publish), filter, new DefaultSpringMessageEventMessageConverter()); - } - - /** - * Initialize the adapter to publish all incoming events to the subscribed processors. Note that this instance should - * be registered as a consumer of a Spring Message Channel. - * - * @param messageSource The inbound of messages to subscribe to. - * @param processors Processors to be subscribed - * @param filter The filter that indicates which messages to forward. - * @param eventMessageConverter The message converter to use to convert spring message into event message - */ - public AxonProcessorMessageHandler(SubscribableMessageSource> messageSource, List>>> processors - , Predicate> filter, SpringMessageEventMessageConverter eventMessageConverter) { - this.messageSource = messageSource; - this.messageProcessors.addAll(processors); - this.filter = filter; - this.eventMessageConverter = eventMessageConverter; - } - - @Override - public Registration subscribe(Consumer>> messageProcessor) { - messageProcessors.add(messageProcessor); - return () -> messageProcessors.remove(messageProcessor); - } - - /** - * Subscribes this event listener to the event bus. - */ - @Override - protected void onInit() { - super.onInit(); - messageSource.subscribe(this::handle); - } - - - /** - * If allows by the filter, wraps the given {@code event} in a {@link GenericMessage} ands sends it to the - * configured {@link MessageChannel}. - * - * @param events the events to handle - */ - protected void handle(List> events) { - events.stream() - .filter(filter) - .forEach(event -> this.sendOutput(eventMessageConverter.toSpringMessage(event), null, false)); - } - - /** - * Handles the given {@code message}. If the filter refuses the message, it is ignored. - * - * @param message The message containing the event to publish - */ - @Override - protected void handleMessageInternal(Message message) { - EventMessage eventMessage = eventMessageConverter.toEventMessage(message); - List> messages = singletonList(eventMessage); - for (Consumer>> messageProcessor : messageProcessors) { - messageProcessor.accept(messages); - } - } - -} diff --git a/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageProcessor.java b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageProcessor.java new file mode 100644 index 0000000..380dfd0 --- /dev/null +++ b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageProcessor.java @@ -0,0 +1,207 @@ +/* + * Copyright (c) 2010-2021. Axon Framework + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.axonframework.extensions.springcloud.eventhandling; + +import org.axonframework.common.AxonConfigurationException; +import org.axonframework.common.Registration; +import org.axonframework.eventhandling.EventBus; +import org.axonframework.eventhandling.EventMessage; +import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageConverter; +import org.axonframework.messaging.SubscribableMessageSource; +import org.springframework.integration.handler.AbstractMessageProducingHandler; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.GenericMessage; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; +import java.util.function.Predicate; + +import static java.util.Collections.singletonList; +import static org.axonframework.common.BuilderUtils.assertNonNull; + +/** + * Initialize the adapter to publish all incoming events to the subscribed processors. Note that this instance should + * be registered as a consumer of a Spring Message Channel. + * + * @author Mehdi chitforoosh + * @since 4.5 + */ +public class SpringStreamMessageProcessor extends AbstractMessageProducingHandler implements SubscribableMessageSource> { + + private final SubscribableMessageSource> messageSource; + private final CopyOnWriteArrayList>>> messageProcessors = new CopyOnWriteArrayList<>(); + private Predicate> filter; + private final SpringMessageConverter converter; + + /** + * Instantiate a {@link SpringStreamMessageProcessor} based on the fields contained in the {@link SpringStreamMessageProcessor.Builder}. + * The {@link SubscribableMessageSource} is a hard requirement and thus should be provided. + * The {@link SpringMessageConverter} is a hard requirement and thus should be provided. + *

+ * Will validate that the {@link SubscribableMessageSource} and {@link SpringMessageConverter} and {@link EventBus} are not {@code null}, + * and will throw an {@link AxonConfigurationException} if for either of them this holds. + * + * @param builder the {@link SpringStreamMessageProcessor.Builder} used to instantiate a {@link SpringStreamMessageProcessor} instance + */ + protected SpringStreamMessageProcessor(SpringStreamMessageProcessor.Builder builder) { + builder.validate(); + this.filter = builder.filter; + this.messageSource = builder.messageSource; + this.converter = builder.converter; + if (builder.eventBus != null) { + this.messageProcessors.addAll(singletonList(builder.eventBus::publish)); + } + // Subscribes this event message handler to the message source. + this.messageSource.subscribe(this::handle); + } + + /** + * Instantiate a Builder to be able to create a {@link SpringStreamMessageProcessor}. + * The {@link SubscribableMessageSource} is a hard requirement and as such should be provided. + * The {@link SpringMessageConverter} is a hard requirement and thus should be provided. + * + * @return a Builder to be able to create a {@link SpringStreamMessageProcessor}. + */ + public static SpringStreamMessageProcessor.Builder builder() { + return new SpringStreamMessageProcessor.Builder(); + } + + @Override + public Registration subscribe(Consumer>> messageProcessor) { + messageProcessors.add(messageProcessor); + return () -> messageProcessors.remove(messageProcessor); + } + + /** + * If allows by the filter, wraps the given {@link EventMessage} in a {@link GenericMessage} ands sends it to the + * configured {@link MessageChannel}. + * + * @param events the events to handle + */ + protected void handle(List> events) { + events.stream() + .filter(filter) + .forEach(event -> this.sendOutput(converter.createSpringMessage(event), null, false)); + } + + /** + * Handles the given {@link Message}. If the filter refuses the message, it is ignored. + * + * @param message The message containing the event to publish + */ + @Override + protected void handleMessageInternal(Message message) { + Optional> optional = converter.readSpringMessage(message); + optional.ifPresent(eventMessage -> { + List> messages = singletonList(eventMessage); + for (Consumer>> messageProcessor : messageProcessors) { + messageProcessor.accept(messages); + } + }); + } + + /** + * Set filter to remove filtered messages + * + * @param filter + */ + public void setFilter(Predicate> filter) { + this.filter = filter; + } + + /** + * Builder class to instantiate a {@link SpringStreamMessageProcessor}. + * The {@link SubscribableMessageSource} is a hard requirement and thus should be provided. + * The {@link SpringMessageConverter} is a hard requirement and thus should be provided. + */ + public static class Builder { + + private SubscribableMessageSource> messageSource; + private EventBus eventBus; + private Predicate> filter = m -> true; + private SpringMessageConverter converter; + + /** + * Sets the filter to send specific event messages. + * + * @param filter The filter to send filtered event messages + * @return the current Builder instance, for fluent interfacing + */ + public SpringStreamMessageProcessor.Builder filter(Predicate> filter) { + this.filter = filter; + return this; + } + + /** + * Sets the messageSource for subscribing handler. + * + * @param messageSource The messageSource to subscribe + * @return the current Builder instance, for fluent interfacing + */ + public SpringStreamMessageProcessor.Builder messageSource(SubscribableMessageSource> messageSource) { + assertNonNull(messageSource, "messageSource may not be null"); + this.messageSource = messageSource; + return this; + } + + /** + * Sets the eventBus to publish received event message. + * + * @param eventBus The eventBus to publish event messages + * @return the current Builder instance, for fluent interfacing + */ + public SpringStreamMessageProcessor.Builder eventBus(EventBus eventBus) { + this.eventBus = eventBus; + return this; + } + + /** + * Sets the converter to convert spring messages to event messages and versa. + * + * @param converter The converter to convert messages + * @return the current Builder instance, for fluent interfacing + */ + public SpringStreamMessageProcessor.Builder converter(SpringMessageConverter converter) { + assertNonNull(converter, "converter may not be null"); + this.converter = converter; + return this; + } + + /** + * Initializes a {@link SpringStreamMessageProcessor} as specified through this Builder. + * + * @return a {@link SpringStreamMessageProcessor} as specified through this Builder + */ + public SpringStreamMessageProcessor build() { + return new SpringStreamMessageProcessor(this); + } + + /** + * Validate whether the fields contained in this Builder as set accordingly. + * + * @throws AxonConfigurationException if one field is asserted to be incorrect according to the Builder's specifications + */ + protected void validate() { + assertNonNull(messageSource, "The MessageSource is a hard requirement and should be provided"); + assertNonNull(converter, "The Converter is a hard requirement and should be provided"); + } + } + +} diff --git a/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessagePublisher.java b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessagePublisher.java new file mode 100644 index 0000000..17c7df5 --- /dev/null +++ b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessagePublisher.java @@ -0,0 +1,162 @@ +/* + * Copyright (c) 2010-2021. Axon Framework + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.axonframework.extensions.springcloud.eventhandling; + +import org.axonframework.common.AxonConfigurationException; +import org.axonframework.eventhandling.EventMessage; +import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageConverter; +import org.axonframework.messaging.SubscribableMessageSource; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.GenericMessage; + +import java.util.List; +import java.util.function.Predicate; + +import static org.axonframework.common.BuilderUtils.assertNonNull; + +/** + * Initialize an adapter to forward messages from the given {@link SubscribableMessageSource} to the given {@link MessageChannel}. + * Messages are not filtered by default; all messages are forwarded to the MessageChannel. + * + * @author Mehdi Chitforoosh + * @since 4.5 + */ +public class SpringStreamMessagePublisher extends MessageProducerSupport { + + private Predicate> filter; + private final SubscribableMessageSource> messageSource; + private final SpringMessageConverter converter; + + /** + * Instantiate a {@link SpringStreamMessagePublisher} based on the fields contained in the {@link SpringStreamMessagePublisher.Builder}. + * The {@link SubscribableMessageSource} is a hard requirement and thus should be provided. + * The {@link SpringMessageConverter} is a hard requirement and thus should be provided. + *

+ * Will validate that the {@link SubscribableMessageSource} and {@link SpringMessageConverter} are not {@code null}, and will throw an + * {@link AxonConfigurationException} if for either of them this holds. + * + * @param builder the {@link SpringStreamMessagePublisher.Builder} used to instantiate a {@link SpringStreamMessagePublisher} instance + */ + protected SpringStreamMessagePublisher(SpringStreamMessagePublisher.Builder builder) { + builder.validate(); + this.filter = builder.filter; + this.messageSource = builder.messageSource; + this.converter = builder.converter; + // Subscribes this event message handler to the message source. + this.messageSource.subscribe(this::handle); + } + + /** + * Instantiate a Builder to be able to create a {@link SpringStreamMessagePublisher}. + * The {@link SubscribableMessageSource} is a hard requirement and as such should be provided. + * The {@link SpringMessageConverter} is a hard requirement and thus should be provided. + * + * @return a Builder to be able to create a {@link SpringStreamMessagePublisher}. + */ + public static SpringStreamMessagePublisher.Builder builder() { + return new SpringStreamMessagePublisher.Builder(); + } + + /** + * If allows by the filter, wraps the given {@link EventMessage} in a {@link GenericMessage} ands sends it to the + * configured {@link MessageChannel}. + * + * @param events the event messages to handle + */ + protected void handle(List> events) { + events.stream() + .filter(this.filter) + .forEach(event -> this.sendMessage(this.converter.createSpringMessage(event))); + } + + /** + * Set filter to remove filtered messages + * @param filter + */ + public void setFilter(Predicate> filter) { + this.filter = filter; + } + + /** + * Builder class to instantiate a {@link SpringStreamMessagePublisher}. + * The {@link SubscribableMessageSource} is a hard requirement and thus should be provided. + * The {@link SpringMessageConverter} is a hard requirement and thus should be provided. + */ + public static class Builder { + + private Predicate> filter = m -> true; + private SubscribableMessageSource> messageSource; + private SpringMessageConverter converter; + + /** + * Sets the filter to send specific event messages. + * + * @param filter The filter to send filtered event messages + * @return the current Builder instance, for fluent interfacing + */ + public SpringStreamMessagePublisher.Builder filter(Predicate> filter) { + this.filter = filter; + return this; + } + + /** + * Sets the messageSource for subscribing handler . + * + * @param messageSource The messageSource to subscribe + * @return the current Builder instance, for fluent interfacing + */ + public SpringStreamMessagePublisher.Builder messageSource(SubscribableMessageSource> messageSource) { + assertNonNull(messageSource, "messageSource may not be null"); + this.messageSource = messageSource; + return this; + } + + /** + * Sets the converter to convert spring messages to event messages and versa . + * + * @param converter The converter to convert messages + * @return the current Builder instance, for fluent interfacing + */ + public SpringStreamMessagePublisher.Builder converter(SpringMessageConverter converter) { + assertNonNull(converter, "converter may not be null"); + this.converter = converter; + return this; + } + + /** + * Initializes a {@link SpringStreamMessagePublisher} as specified through this Builder. + * + * @return a {@link SpringStreamMessagePublisher} as specified through this Builder + */ + public SpringStreamMessagePublisher build() { + return new SpringStreamMessagePublisher(this); + } + + /** + * Validate whether the fields contained in this Builder as set accordingly. + * + * @throws AxonConfigurationException if one field is asserted to be incorrect according to the Builder's + * specifications + */ + protected void validate() { + assertNonNull(messageSource, "The MessageSource is a hard requirement and should be provided"); + assertNonNull(converter, "The Converter is a hard requirement and should be provided"); + } + } + +} diff --git a/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageSource.java b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageSource.java new file mode 100644 index 0000000..c98d2e6 --- /dev/null +++ b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageSource.java @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2010-2021. Axon Framework + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.axonframework.extensions.springcloud.eventhandling; + +import org.axonframework.common.AxonConfigurationException; +import org.axonframework.common.Registration; +import org.axonframework.eventhandling.EventBus; +import org.axonframework.eventhandling.EventMessage; +import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageConverter; +import org.axonframework.messaging.SubscribableMessageSource; +import org.springframework.integration.handler.AbstractMessageProducingHandler; +import org.springframework.messaging.Message; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; + +import static java.util.Collections.singletonList; +import static org.axonframework.common.BuilderUtils.assertNonNull; + +/** + * Initialize an SpringStreamMessageSource instance that sends all incoming spring {@link Message} to the given + * {@link EventBus}. It is still possible for other Event Processors to subscribe to this MessageChannelAdapter. + * + * @author Mehdi Chitforoosh + * @since 4.5 + */ +public class SpringStreamMessageSource extends AbstractMessageProducingHandler implements SubscribableMessageSource> { + + private final CopyOnWriteArrayList>>> messageProcessors = new CopyOnWriteArrayList<>(); + private final SpringMessageConverter converter; + + /** + * Instantiate a {@link SpringStreamMessageSource} based on the fields contained in the {@link SpringStreamMessageSource.Builder}. + * The {@link EventBus} is a hard requirement and thus should be provided. + * The {@link SpringMessageConverter} is a hard requirement and thus should be provided. + *

+ * Will validate that the {@link EventBus} and {@link SpringMessageConverter} are not {@code null}, and will throw an + * {@link AxonConfigurationException} if for either of them this holds. + * + * @param builder the {@link SpringStreamMessageSource.Builder} used to instantiate a {@link SpringStreamMessageSource} instance + */ + protected SpringStreamMessageSource(SpringStreamMessageSource.Builder builder) { + builder.validate(); + if (builder.eventBus != null) { + this.messageProcessors.addAll(singletonList(builder.eventBus::publish)); + } + this.converter = builder.converter; + } + + /** + * Instantiate a Builder to be able to create a {@link SpringStreamMessageSource}. + * The {@link SpringMessageConverter} is a hard requirement and thus should be provided. + * + * @return a Builder to be able to create a {@link SpringStreamMessageSource}. + */ + public static SpringStreamMessageSource.Builder builder() { + return new SpringStreamMessageSource.Builder(); + } + + @Override + public Registration subscribe(Consumer>> messageProcessor) { + messageProcessors.add(messageProcessor); + return () -> messageProcessors.remove(messageProcessor); + } + + /** + * Handles the given {@link Message}. If the filter refuses the message, it is ignored. + * + * @param message The spring message containing the event to publish + */ + @Override + protected void handleMessageInternal(Message message) { + Optional> optional = converter.readSpringMessage(message); + optional.ifPresent(eventMessage -> { + List> messages = singletonList(eventMessage); + for (Consumer>> messageProcessor : messageProcessors) { + messageProcessor.accept(messages); + } + }); + } + + /** + * Builder class to instantiate a {@link SpringStreamMessageSource}. + * The {@link SpringMessageConverter} is a hard requirement and thus should be provided. + */ + public static class Builder { + + private EventBus eventBus; + private SpringMessageConverter converter; + + /** + * Sets the eventBus for publishing events. + * + * @param eventBus The messageSource to subscribe + * @return the current Builder instance, for fluent interfacing + */ + public SpringStreamMessageSource.Builder eventBus(EventBus eventBus) { + this.eventBus = eventBus; + return this; + } + + /** + * Sets the converter to convert spring messages to event messages and versa . + * + * @param converter The converter to convert messages + * @return the current Builder instance, for fluent interfacing + */ + public SpringStreamMessageSource.Builder converter(SpringMessageConverter converter) { + assertNonNull(converter, "converter may not be null"); + this.converter = converter; + return this; + } + + /** + * Initializes a {@link SpringStreamMessageSource} as specified through this Builder. + * + * @return a {@link SpringStreamMessageSource} as specified through this Builder + */ + public SpringStreamMessageSource build() { + return new SpringStreamMessageSource(this); + } + + /** + * Validate whether the fields contained in this Builder as set accordingly. + * + * @throws AxonConfigurationException if one field is asserted to be incorrect according to the Builder's + * specifications + */ + protected void validate() { + assertNonNull(converter, "The Converter is a hard requirement and should be provided"); + } + } + +} diff --git a/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/DefaultSpringMessageConverter.java b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/DefaultSpringMessageConverter.java new file mode 100644 index 0000000..6d8e9d6 --- /dev/null +++ b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/DefaultSpringMessageConverter.java @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2010-2021. Axon Framework + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.axonframework.extensions.springcloud.eventhandling.converter; + +import org.axonframework.common.AxonConfigurationException; +import org.axonframework.common.DateTimeUtils; +import org.axonframework.eventhandling.EventMessage; +import org.axonframework.eventhandling.GenericDomainEventMessage; +import org.axonframework.eventhandling.GenericEventMessage; +import org.axonframework.messaging.Headers; +import org.axonframework.messaging.MetaData; +import org.axonframework.serialization.*; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.GenericMessage; + +import java.util.*; + +import static org.axonframework.common.BuilderUtils.assertNonNull; +import static org.axonframework.common.DateTimeUtils.formatInstant; +import static org.axonframework.messaging.Headers.MESSAGE_TIMESTAMP; + +/** + * Default implementation of the SpringMessageConverter interface. This implementation will suffice in most cases. It + * passes all meta-data entries as headers (with 'axon-metadata-' prefix) to the message. Other message-specific + * attributes are also added as meta data. The message payload is serialized using the configured serializer and passed + * as the message body. + * + * @author Mehdi Chitforoosh + * @since 4.5 + */ +public class DefaultSpringMessageConverter implements SpringMessageConverter { + + private Serializer serializer; + + /** + * Instantiate a {@link DefaultSpringMessageConverter} based on the fields contained in the {@link Builder}. + * The {@link Serializer} is a hard requirement and thus should be provided. + *

+ * Will validate that the {@link Serializer} is not {@code null}, and will throw an + * {@link AxonConfigurationException} if is null. + * + * @param builder the {@link Builder} used to instantiate a {@link DefaultSpringMessageConverter} instance + */ + protected DefaultSpringMessageConverter(Builder builder) { + builder.validate(); + this.serializer = builder.serializer; + } + + /** + * Instantiate a Builder to be able to create a {@link DefaultSpringMessageConverter}. + * The {@link Serializer} is a hard requirement and as such should be provided. + * + * @return a Builder to be able to create a {@link DefaultSpringMessageConverter}. + */ + public static Builder builder() { + return new Builder(); + } + + @Override + public Message createSpringMessage(EventMessage eventMessage) { + if (eventMessage == null || eventMessage.getPayload() == null) { + throw new NullPointerException("Event message or payload should not be null"); + } + Object payload = eventMessage.getPayload(); + SerializedObject serializedObject = eventMessage.serializePayload(serializer, payload.getClass()); + Map headers = new HashMap<>(); + eventMessage.getMetaData().forEach((k, v) -> headers.put(Headers.MESSAGE_METADATA + "-" + k, v)); + Headers.defaultHeaders(eventMessage, serializedObject).forEach((k, v) -> { + if (k.equals(MESSAGE_TIMESTAMP)) { + headers.put(k, formatInstant(eventMessage.getTimestamp())); + } else { + headers.put(k, v); + } + }); + return new GenericMessage<>(serializedObject.getData(), new DefaultSpringMessageConverter.SettableTimestampMessageHeaders(headers, eventMessage.getTimestamp().toEpochMilli())); + } + + @Override + public Optional> readSpringMessage(Message message) { + if (message == null) { + return Optional.empty(); + } + MessageHeaders headers = message.getHeaders(); + if (!headers.keySet().containsAll(Arrays.asList(Headers.MESSAGE_ID, Headers.MESSAGE_TYPE))) { + return Optional.empty(); + } + Object payload = message.getPayload(); + Map metaData = new HashMap<>(); + headers.forEach((k, v) -> { + if (k.startsWith(Headers.MESSAGE_METADATA + "-")) { + metaData.put(k.substring((Headers.MESSAGE_METADATA + "-").length()), v); + } + }); + SimpleSerializedObject serializedMessage = new SimpleSerializedObject(payload, payload.getClass(), + Objects.toString(headers.get(Headers.MESSAGE_TYPE)), + Objects.toString(headers.get(Headers.MESSAGE_REVISION), null)); + SerializedMessage delegateMessage = new SerializedMessage<>(Objects.toString(headers.get(Headers.MESSAGE_ID)), + new LazyDeserializingObject<>(serializedMessage, serializer), + new LazyDeserializingObject<>(MetaData.from(metaData))); + String timestamp = Objects.toString(headers.get(MESSAGE_TIMESTAMP)); + if (headers.containsKey(Headers.AGGREGATE_ID)) { + return Optional.of(new GenericDomainEventMessage<>(Objects.toString(headers.get(Headers.AGGREGATE_TYPE)), + Objects.toString(headers.get(Headers.AGGREGATE_ID)), + (Long) headers.get(Headers.AGGREGATE_SEQ), + delegateMessage, () -> DateTimeUtils.parseInstant(timestamp))); + } else { + return Optional.of(new GenericEventMessage<>(delegateMessage, () -> DateTimeUtils.parseInstant(timestamp))); + } + } + + private static class SettableTimestampMessageHeaders extends MessageHeaders { + protected SettableTimestampMessageHeaders(Map headers, Long timestamp) { + super(headers, null, timestamp); + } + } + + /** + * Builder class to instantiate a {@link DefaultSpringMessageConverter}. + * The {@link Serializer} is a hard requirement and thus should be provided. + */ + public static class Builder { + + private Serializer serializer; + + /** + * Sets the serializer to serialize the Event Message's payload and Meta Data with. + * + * @param serializer The serializer to serialize the Event Message's payload and Meta Data with + * @return the current Builder instance, for fluent interfacing + */ + public Builder serializer(Serializer serializer) { + assertNonNull(serializer, "Serializer may not be null"); + this.serializer = serializer; + return this; + } + + /** + * Initializes a {@link DefaultSpringMessageConverter} as specified through this Builder. + * + * @return a {@link DefaultSpringMessageConverter} as specified through this Builder + */ + public DefaultSpringMessageConverter build() { + return new DefaultSpringMessageConverter(this); + } + + /** + * Validate whether the fields contained in this Builder as set accordingly. + * + * @throws AxonConfigurationException if one field is asserted to be incorrect according to the Builder's + * specifications + */ + protected void validate() { + assertNonNull(serializer, "The Serializer is a hard requirement and should be provided"); + } + } +} diff --git a/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/DefaultSpringMessageEventMessageConverter.java b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/DefaultSpringMessageEventMessageConverter.java deleted file mode 100644 index 710099c..0000000 --- a/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/DefaultSpringMessageEventMessageConverter.java +++ /dev/null @@ -1,97 +0,0 @@ -package org.axonframework.extensions.springcloud.eventhandling.converter; - -import org.axonframework.common.Assert; -import org.axonframework.common.DateTimeUtils; -import org.axonframework.eventhandling.EventMessage; -import org.axonframework.eventhandling.GenericDomainEventMessage; -import org.axonframework.eventhandling.GenericEventMessage; -import org.axonframework.messaging.Headers; -import org.axonframework.messaging.MetaData; -import org.axonframework.serialization.*; -import org.axonframework.serialization.json.JacksonSerializer; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.support.GenericMessage; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - -import static org.axonframework.common.DateTimeUtils.formatInstant; -import static org.axonframework.messaging.Headers.MESSAGE_TIMESTAMP; - -/** - * Default byte[] spring message to event message converter based on Jackson serializer. - * - * @Mehdi Chitforoosh - * @since 4.1 - */ -public class DefaultSpringMessageEventMessageConverter implements SpringMessageEventMessageConverter { - - private final Serializer serializer; - - public DefaultSpringMessageEventMessageConverter() { - this.serializer = JacksonSerializer.builder().build(); - } - - public DefaultSpringMessageEventMessageConverter(Serializer serializer) { - Assert.notNull(serializer, () -> "Serializer may not be null"); - this.serializer = serializer; - } - - @Override - public Message toSpringMessage(EventMessage eventMessage) { - SerializedObject serializedObject = eventMessage.serializePayload(serializer, byte[].class); - Map headers = new HashMap<>(); - eventMessage.getMetaData().forEach((k, v) -> headers.put(Headers.MESSAGE_METADATA + "-" + k, v)); - Headers.defaultHeaders(eventMessage, serializedObject).forEach((k, v) -> { - if (k.equals(MESSAGE_TIMESTAMP)) { - headers.put(k, formatInstant(eventMessage.getTimestamp())); - } else { - headers.put(k, v); - } - }); - return new GenericMessage<>(serializedObject.getData(), new DefaultSpringMessageEventMessageConverter.SettableTimestampMessageHeaders(headers, eventMessage.getTimestamp().toEpochMilli())); - - } - - @Override - public EventMessage toEventMessage(Message message) { - if (!(message.getPayload() instanceof byte[])) { - throw new IllegalArgumentException("message payload should be byte[]"); - } - MessageHeaders headers = message.getHeaders(); - if (!headers.keySet().containsAll(Arrays.asList(Headers.MESSAGE_ID, Headers.MESSAGE_TYPE))) { - throw new IllegalArgumentException("axon message id or axon message type doesn't exist."); - } - byte[] payload = (byte[]) message.getPayload(); - Map metaData = new HashMap<>(); - headers.forEach((k, v) -> { - if (k.startsWith(Headers.MESSAGE_METADATA + "-")) { - metaData.put(k.substring((Headers.MESSAGE_METADATA + "-").length()), v); - } - }); - SimpleSerializedObject serializedMessage = new SimpleSerializedObject<>(payload, byte[].class, - Objects.toString(headers.get(Headers.MESSAGE_TYPE)), - Objects.toString(headers.get(Headers.MESSAGE_REVISION), null)); - SerializedMessage delegateMessage = new SerializedMessage<>(Objects.toString(headers.get(Headers.MESSAGE_ID)), - new LazyDeserializingObject<>(serializedMessage, serializer), - new LazyDeserializingObject<>(MetaData.from(metaData))); - String timestamp = Objects.toString(headers.get(MESSAGE_TIMESTAMP)); - if (headers.containsKey(Headers.AGGREGATE_ID)) { - return new GenericDomainEventMessage<>(Objects.toString(headers.get(Headers.AGGREGATE_TYPE)), - Objects.toString(headers.get(Headers.AGGREGATE_ID)), - (Long) headers.get(Headers.AGGREGATE_SEQ), - delegateMessage, () -> DateTimeUtils.parseInstant(timestamp)); - } else { - return new GenericEventMessage<>(delegateMessage, () -> DateTimeUtils.parseInstant(timestamp)); - } - } - - private static class SettableTimestampMessageHeaders extends MessageHeaders { - protected SettableTimestampMessageHeaders(Map headers, Long timestamp) { - super(headers, null, timestamp); - } - } -} diff --git a/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageEventMessageConverter.java b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageConverter.java similarity index 84% rename from springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageEventMessageConverter.java rename to springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageConverter.java index 1545597..d622984 100644 --- a/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageEventMessageConverter.java +++ b/springcloud-streams/src/main/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageConverter.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2014. Axon Framework + * Copyright (c) 2010-2021. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,13 +19,15 @@ import org.axonframework.eventhandling.EventMessage; import org.springframework.messaging.Message; +import java.util.Optional; + /** * Interface describing a mechanism that converts Spring Messages from an Axon Messages and vice versa. * * @author Mehdi Chitforoosh - * @since 4.1 + * @since 4.5 */ -public interface SpringMessageEventMessageConverter { +public interface SpringMessageConverter { /** * Creates an Spring Message from given {@code eventMessage}. @@ -34,7 +36,7 @@ public interface SpringMessageEventMessageConverter { * @return an Spring Message containing the payload and headers * Broker. */ - Message toSpringMessage(EventMessage eventMessage); + Message createSpringMessage(EventMessage eventMessage); /** * Reconstruct an EventMessage from the given spring message. The returned value @@ -43,5 +45,5 @@ public interface SpringMessageEventMessageConverter { * @param message spring message * @return The Event Message to publish on the local event processors */ - EventMessage toEventMessage(Message message); + Optional> readSpringMessage(Message message); } From a6507d40ec0d8fd8e14f293ae0b3793eb055157b Mon Sep 17 00:00:00 2001 From: Mehdi Chitforoosh Date: Sun, 18 Apr 2021 18:20:13 +0430 Subject: [PATCH 6/9] Add test classes for spring cloud stream module --- .../SpringStreamMessageProcessorTest.java | 116 ++++++++ .../SpringStreamMessagePublisherTest.java | 255 ++++++++++++++++++ ...eamMessageSourceAndPublisherMixedTest.java | 119 ++++++++ .../SpringStreamMessageSourceTest.java | 126 +++++++++ .../MessageProcessorTestConfiguration.java | 18 ++ .../MessagePublisherTestConfiguration.java | 27 ++ .../MessageSourceTestConfiguration.java | 16 ++ .../configuration/PublisherTestChannels.java | 19 ++ .../configuration/SourceTestChannels.java | 19 ++ .../converter/SpringMessageConverterTest.java | 190 +++++++++++++ .../eventhandling/utils/TestSerializer.java | 44 +++ 11 files changed, 949 insertions(+) create mode 100644 springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageProcessorTest.java create mode 100644 springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessagePublisherTest.java create mode 100644 springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageSourceAndPublisherMixedTest.java create mode 100644 springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageSourceTest.java create mode 100644 springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/MessageProcessorTestConfiguration.java create mode 100644 springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/MessagePublisherTestConfiguration.java create mode 100644 springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/MessageSourceTestConfiguration.java create mode 100644 springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/PublisherTestChannels.java create mode 100644 springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/SourceTestChannels.java create mode 100644 springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageConverterTest.java create mode 100644 springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/utils/TestSerializer.java diff --git a/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageProcessorTest.java b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageProcessorTest.java new file mode 100644 index 0000000..c63363f --- /dev/null +++ b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageProcessorTest.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2010-2021. Axon Framework + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.axonframework.extensions.springcloud.eventhandling; + +import org.axonframework.eventhandling.EventBus; +import org.axonframework.eventhandling.EventMessage; +import org.axonframework.eventhandling.GenericEventMessage; +import org.axonframework.eventhandling.SimpleEventBus; +import org.axonframework.extensions.springcloud.eventhandling.configuration.MessageProcessorTestConfiguration; +import org.axonframework.extensions.springcloud.eventhandling.converter.DefaultSpringMessageConverter; +import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageConverter; +import org.axonframework.extensions.springcloud.eventhandling.utils.TestSerializer; +import org.axonframework.messaging.MetaData; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.List; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * Test class validating the {@link SpringStreamMessageProcessor}. + * + * @author Mehdi Chitforoosh + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = MessageProcessorTestConfiguration.class) +@DirtiesContext +class SpringStreamMessageProcessorTest { + + @Autowired + private Processor testChannels; + + private EventBus eventBus; + + private SpringMessageConverter converter; + + private SpringStreamMessageProcessor testSubject; + + @BeforeEach + void setUp() { + eventBus = SimpleEventBus.builder().build(); + converter = DefaultSpringMessageConverter.builder() + .serializer(TestSerializer.secureXStreamSerializer()) + .build(); + testSubject = SpringStreamMessageProcessor.builder() + .messageSource(eventBus) + .converter(converter) + .build(); + testSubject.setOutputChannel(testChannels.output()); + testChannels.input().subscribe(testSubject); + } + + @AfterEach + void clean() { + testChannels.input().unsubscribe(testSubject); + } + + @Test + void testMessageListenerInvokesAllEventProcessors() { + Consumer>> eventProcessor = mock(Consumer.class); + String payload = "testMessage"; + EventMessage eventMessage = GenericEventMessage.asEventMessage(payload); + + testSubject.subscribe(eventProcessor); + + eventBus.publish(eventMessage); + + verify(eventProcessor).accept(argThat(item -> item.size() == 1 && item.get(0).getPayload().equals(payload))); + } + + @Test + void testMessageListenerArgument() { + String payload = "testMessage"; + EventMessage eventMessage = GenericEventMessage.asEventMessage(payload) + .withMetaData(MetaData.with("key", "value")); + + Consumer>> eventProcessor = receivedEventMessages -> { + EventMessage em = receivedEventMessages.get(0); + assertEquals(payload, em.getPayload()); + assertEquals(eventMessage.getIdentifier(), em.getIdentifier()); + assertEquals(eventMessage.getTimestamp(), em.getTimestamp()); + assertEquals("value", em.getMetaData().get("key")); + }; + + testSubject.subscribe(eventProcessor); + + eventBus.publish(eventMessage); + } + +} diff --git a/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessagePublisherTest.java b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessagePublisherTest.java new file mode 100644 index 0000000..37e0563 --- /dev/null +++ b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessagePublisherTest.java @@ -0,0 +1,255 @@ +/* + * Copyright (c) 2010-2021. Axon Framework + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.axonframework.extensions.springcloud.eventhandling; + +import org.axonframework.eventhandling.*; +import org.axonframework.extensions.springcloud.eventhandling.configuration.MessagePublisherTestConfiguration; +import org.axonframework.extensions.springcloud.eventhandling.configuration.PublisherTestChannels; +import org.axonframework.extensions.springcloud.eventhandling.converter.DefaultSpringMessageConverter; +import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageConverter; +import org.axonframework.messaging.Headers; +import org.axonframework.messaging.MetaData; +import org.axonframework.serialization.Serializer; +import org.axonframework.serialization.SimpleSerializedObject; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.messaging.Message; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.axonframework.common.DateTimeUtils.formatInstant; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.*; + +/** + * Test class validating the {@link SpringStreamMessagePublisher}. + * + * @author Mehdi Chitforoosh + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = MessagePublisherTestConfiguration.class) +@DirtiesContext +class SpringStreamMessagePublisherTest { + + @Autowired + private PublisherTestChannels testChannels; + + @Autowired + private MessageCollector messageCollector; + + private EventBus eventBus; + + private Serializer serializer; + private SpringMessageConverter converter; + + private SpringStreamMessagePublisher testSubject; + + @BeforeEach + void setUp() { + eventBus = SimpleEventBus.builder().build(); + serializer = mock(Serializer.class); + converter = spy(DefaultSpringMessageConverter.builder() + .serializer(serializer) + .build()); + testSubject = new SpringStreamMessagePublisher.Builder() + .messageSource(eventBus) + .converter(converter) + .build(); + testSubject.setOutputChannel(testChannels.input()); + } + + @Test + void testSendOneMessage() { + String payload = "testMessage"; + EventMessage eventMessage = GenericEventMessage.asEventMessage(payload) + .withMetaData(MetaData.with("key", "value")); + + SimpleSerializedObject testObject = + new SimpleSerializedObject(payload, payload.getClass(), payload.getClass().getName(), "0"); + when(serializer.serialize(payload, payload.getClass())).thenAnswer(invocationOnMock -> testObject); + + eventBus.publish(eventMessage); + // Messages emitted by the Spring Stream Cloud test support in a messageCollector + Message message = messageCollector.forChannel(testChannels.output()).poll(); + + assertEquals(payload, message.getPayload()); + assertEquals(eventMessage.getIdentifier(), message.getHeaders().get(Headers.MESSAGE_ID)); + assertEquals(eventMessage.getPayloadType().getName(), message.getHeaders().get(Headers.MESSAGE_TYPE)); + assertEquals("0", message.getHeaders().get(Headers.MESSAGE_REVISION)); + assertEquals(formatInstant(eventMessage.getTimestamp()), message.getHeaders().get(Headers.MESSAGE_TIMESTAMP)); + assertEquals("value", message.getHeaders().get(Headers.MESSAGE_METADATA + "-" + "key")); + + verify(converter, times(1)).createSpringMessage(any(EventMessage.class)); + } + + @Test + void testOneDomainEventMessage() { + String payload = "testMessage"; + GenericDomainEventMessage domainEventMessage = + new GenericDomainEventMessage<>(payload.getClass().getName(), "testId", 1L, payload, MetaData.with("key", "value")); + + SimpleSerializedObject testObject = + new SimpleSerializedObject(payload, payload.getClass(), payload.getClass().getName(), "0"); + when(serializer.serialize(payload, payload.getClass())).thenAnswer(invocationOnMock -> testObject); + + eventBus.publish(domainEventMessage); + // Messages emitted by the Spring Stream Cloud test support in a messageCollector + Message messageContainsDomainEventMessage = messageCollector.forChannel(testChannels.output()).poll(); + + assertEquals(payload, messageContainsDomainEventMessage.getPayload()); + assertEquals(domainEventMessage.getIdentifier(), messageContainsDomainEventMessage.getHeaders().get(Headers.MESSAGE_ID)); + assertEquals(domainEventMessage.getPayloadType().getName(), messageContainsDomainEventMessage.getHeaders().get(Headers.MESSAGE_TYPE)); + assertEquals("0", messageContainsDomainEventMessage.getHeaders().get(Headers.MESSAGE_REVISION)); + assertEquals(formatInstant(domainEventMessage.getTimestamp()), messageContainsDomainEventMessage.getHeaders().get(Headers.MESSAGE_TIMESTAMP)); + assertEquals("value", messageContainsDomainEventMessage.getHeaders().get(Headers.MESSAGE_METADATA + "-" + "key")); + assertEquals(((DomainEventMessage) domainEventMessage).getAggregateIdentifier(), messageContainsDomainEventMessage.getHeaders().get(Headers.AGGREGATE_ID)); + assertEquals(((DomainEventMessage) domainEventMessage).getSequenceNumber(), messageContainsDomainEventMessage.getHeaders().get(Headers.AGGREGATE_SEQ)); + assertEquals(((DomainEventMessage) domainEventMessage).getType(), messageContainsDomainEventMessage.getHeaders().get(Headers.AGGREGATE_TYPE)); + + verify(converter, times(1)).createSpringMessage(any(EventMessage.class)); + } + + @Test + void testSendMultipleMessages() { + // Given + String firstPayload = "testMessage1"; + String secondPayload = "testMessage2"; + String thirdPayload = "testMessage3"; + EventMessage firstEventMessage = GenericEventMessage.asEventMessage(firstPayload) + .withMetaData(MetaData.with("key", "value")); + EventMessage secondEventMessage = GenericEventMessage.asEventMessage(secondPayload) + .withMetaData(MetaData.with("key", "value")); + EventMessage thirdEventMessage = GenericEventMessage.asEventMessage(thirdPayload) + .withMetaData(MetaData.with("key", "value")); + List> list = new ArrayList<>(Arrays.asList(firstEventMessage, secondEventMessage, thirdEventMessage)); + + SimpleSerializedObject firstTestObject = + new SimpleSerializedObject(firstPayload, firstPayload.getClass(), firstPayload.getClass().getName(), "0"); + SimpleSerializedObject secondTestObject = + new SimpleSerializedObject(secondPayload, secondPayload.getClass(), secondPayload.getClass().getName(), "0"); + SimpleSerializedObject thirdTestObject = + new SimpleSerializedObject(thirdPayload, thirdPayload.getClass(), thirdPayload.getClass().getName(), "0"); + when(serializer.serialize(anyString(), any())).thenAnswer(invocationOnMock -> { + String arg0 = invocationOnMock.getArgument(0); + if (arg0.equals("testMessage1")) { + return firstTestObject; + } else if (arg0.equals("testMessage2")) { + return secondTestObject; + } else { + return thirdTestObject; + } + }); + + // When + eventBus.publish(list); + // Messages emitted by the Spring Stream Cloud test support in a messageCollector + Message firstMessage = messageCollector.forChannel(testChannels.output()).poll(); + Message secondMessage = messageCollector.forChannel(testChannels.output()).poll(); + Message thirdMessage = messageCollector.forChannel(testChannels.output()).poll(); + + // Then + assertEquals(firstPayload, firstMessage.getPayload()); + assertEquals(firstEventMessage.getIdentifier(), firstMessage.getHeaders().get(Headers.MESSAGE_ID)); + assertEquals(firstEventMessage.getPayloadType().getName(), firstMessage.getHeaders().get(Headers.MESSAGE_TYPE)); + assertEquals("0", firstMessage.getHeaders().get(Headers.MESSAGE_REVISION)); + assertEquals(formatInstant(firstEventMessage.getTimestamp()), firstMessage.getHeaders().get(Headers.MESSAGE_TIMESTAMP)); + assertEquals("value", firstMessage.getHeaders().get(Headers.MESSAGE_METADATA + "-" + "key")); + + assertEquals(secondPayload, secondMessage.getPayload()); + assertEquals(secondEventMessage.getIdentifier(), secondMessage.getHeaders().get(Headers.MESSAGE_ID)); + assertEquals(secondEventMessage.getPayloadType().getName(), secondMessage.getHeaders().get(Headers.MESSAGE_TYPE)); + assertEquals("0", secondMessage.getHeaders().get(Headers.MESSAGE_REVISION)); + assertEquals(formatInstant(secondEventMessage.getTimestamp()), secondMessage.getHeaders().get(Headers.MESSAGE_TIMESTAMP)); + assertEquals("value", secondMessage.getHeaders().get(Headers.MESSAGE_METADATA + "-" + "key")); + + assertEquals(thirdPayload, thirdMessage.getPayload()); + assertEquals(thirdEventMessage.getIdentifier(), thirdMessage.getHeaders().get(Headers.MESSAGE_ID)); + assertEquals(thirdEventMessage.getPayloadType().getName(), thirdMessage.getHeaders().get(Headers.MESSAGE_TYPE)); + assertEquals("0", thirdMessage.getHeaders().get(Headers.MESSAGE_REVISION)); + assertEquals(formatInstant(thirdEventMessage.getTimestamp()), thirdMessage.getHeaders().get(Headers.MESSAGE_TIMESTAMP)); + assertEquals("value", thirdMessage.getHeaders().get(Headers.MESSAGE_METADATA + "-" + "key")); + + verify(converter, times(3)).createSpringMessage(any(EventMessage.class)); + } + + @Test + void testSendFilteredMessages() { + String firstPayload = "testMessage1"; + String secondPayload = "testMessage2"; + String thirdPayload = "testMessage3"; + EventMessage firstEventMessage = GenericEventMessage.asEventMessage(firstPayload) + .withMetaData(MetaData.with("key", "value")); + EventMessage secondEventMessage = GenericEventMessage.asEventMessage(secondPayload) + .withMetaData(MetaData.with("key", "value")); + EventMessage thirdEventMessage = GenericEventMessage.asEventMessage(thirdPayload) + .withMetaData(MetaData.with("key", "value")); + List> list = new ArrayList<>(Arrays.asList(firstEventMessage, secondEventMessage, thirdEventMessage)); + + SimpleSerializedObject firstSerializedObject = + new SimpleSerializedObject(firstPayload, firstPayload.getClass(), firstPayload.getClass().getName(), "0"); + SimpleSerializedObject secondSerializedObject = + new SimpleSerializedObject(secondPayload, secondPayload.getClass(), secondPayload.getClass().getName(), "0"); + SimpleSerializedObject thirdSerializedObject = + new SimpleSerializedObject(thirdPayload, thirdPayload.getClass(), thirdPayload.getClass().getName(), "0"); + when(serializer.serialize(anyString(), any())).thenAnswer(invocationOnMock -> { + String arg0 = invocationOnMock.getArgument(0); + if (arg0.equals("testMessage1")) { + return firstSerializedObject; + } else if (arg0.equals("testMessage2")) { + return secondSerializedObject; + } else { + return thirdSerializedObject; + } + }); + //Set filter to remove second event message + testSubject.setFilter(eventMessage -> !eventMessage.getPayload().equals("testMessage2")); + + eventBus.publish(list); + // Messages emitted by the Spring Stream Cloud test support in a messageCollector + Message firstMessage = messageCollector.forChannel(testChannels.output()).poll(); + Message secondMessage = messageCollector.forChannel(testChannels.output()).poll(); + Message thirdMessage = messageCollector.forChannel(testChannels.output()).poll(); + + assertEquals(firstPayload, firstMessage.getPayload()); + assertEquals(firstEventMessage.getIdentifier(), firstMessage.getHeaders().get(Headers.MESSAGE_ID)); + assertEquals(firstEventMessage.getPayloadType().getName(), firstMessage.getHeaders().get(Headers.MESSAGE_TYPE)); + assertEquals("0", firstMessage.getHeaders().get(Headers.MESSAGE_REVISION)); + assertEquals(formatInstant(firstEventMessage.getTimestamp()), firstMessage.getHeaders().get(Headers.MESSAGE_TIMESTAMP)); + assertEquals("value", firstMessage.getHeaders().get(Headers.MESSAGE_METADATA + "-" + "key")); + + assertEquals(thirdPayload, secondMessage.getPayload()); + assertEquals(thirdEventMessage.getIdentifier(), secondMessage.getHeaders().get(Headers.MESSAGE_ID)); + assertEquals(thirdEventMessage.getPayloadType().getName(), secondMessage.getHeaders().get(Headers.MESSAGE_TYPE)); + assertEquals("0", secondMessage.getHeaders().get(Headers.MESSAGE_REVISION)); + assertEquals(formatInstant(thirdEventMessage.getTimestamp()), secondMessage.getHeaders().get(Headers.MESSAGE_TIMESTAMP)); + assertEquals("value", secondMessage.getHeaders().get(Headers.MESSAGE_METADATA + "-" + "key")); + + assertNull(thirdMessage); + + verify(converter, times(2)).createSpringMessage(any(EventMessage.class)); + } + +} diff --git a/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageSourceAndPublisherMixedTest.java b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageSourceAndPublisherMixedTest.java new file mode 100644 index 0000000..0bb7670 --- /dev/null +++ b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageSourceAndPublisherMixedTest.java @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2010-2021. Axon Framework + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.axonframework.extensions.springcloud.eventhandling; + +import org.axonframework.eventhandling.EventBus; +import org.axonframework.eventhandling.EventMessage; +import org.axonframework.eventhandling.GenericEventMessage; +import org.axonframework.eventhandling.SimpleEventBus; +import org.axonframework.extensions.springcloud.eventhandling.configuration.MessageProcessorTestConfiguration; +import org.axonframework.extensions.springcloud.eventhandling.converter.DefaultSpringMessageConverter; +import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageConverter; +import org.axonframework.extensions.springcloud.eventhandling.utils.TestSerializer; +import org.axonframework.messaging.MetaData; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.List; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * @author Mehdi Chitforoosh + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = MessageProcessorTestConfiguration.class) +@DirtiesContext +class SpringStreamMessageSourceAndPublisherMixedTest { + + @Autowired + private Processor testChannels; + + private EventBus eventBus; + + private SpringMessageConverter converter; + + private SpringStreamMessagePublisher messagePublisher; + + private SpringStreamMessageSource messageSource; + + @BeforeEach + void setUp() { + eventBus = SimpleEventBus.builder().build(); + converter = DefaultSpringMessageConverter.builder() + .serializer(TestSerializer.secureXStreamSerializer()) + .build(); + messagePublisher = SpringStreamMessagePublisher.builder() + .messageSource(eventBus) + .converter(converter) + .build(); + messageSource = SpringStreamMessageSource.builder() + .converter(converter) + .build(); + messagePublisher.setOutputChannel(testChannels.output()); + testChannels.input().subscribe(messageSource); + } + + @AfterEach + void clean() { + testChannels.input().unsubscribe(messageSource); + } + + @Test + void testMessageListenerInvokes() { + Consumer>> eventProcessor = mock(Consumer.class); + String payload = "testMessage"; + EventMessage eventMessage = GenericEventMessage.asEventMessage(payload); + + messageSource.subscribe(eventProcessor); + + eventBus.publish(eventMessage); + + verify(eventProcessor).accept(argThat(item -> item.size() == 1 && item.get(0).getPayload().equals(payload))); + } + + @Test + void testMessageListenerArgument() { + String payload = "testMessage"; + EventMessage eventMessage = GenericEventMessage.asEventMessage(payload) + .withMetaData(MetaData.with("key", "value")); + + Consumer>> eventProcessor = receivedEventMessages -> { + EventMessage em = receivedEventMessages.get(0); + assertEquals(payload, em.getPayload()); + assertEquals(eventMessage.getIdentifier(), em.getIdentifier()); + assertEquals(eventMessage.getTimestamp(), em.getTimestamp()); + assertEquals("value", em.getMetaData().get("key")); + }; + + messageSource.subscribe(eventProcessor); + + eventBus.publish(eventMessage); + } + +} diff --git a/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageSourceTest.java b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageSourceTest.java new file mode 100644 index 0000000..73ababd --- /dev/null +++ b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageSourceTest.java @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2010-2021. Axon Framework + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.axonframework.extensions.springcloud.eventhandling; + +import org.axonframework.eventhandling.EventMessage; +import org.axonframework.eventhandling.GenericEventMessage; +import org.axonframework.extensions.springcloud.eventhandling.configuration.MessageSourceTestConfiguration; +import org.axonframework.extensions.springcloud.eventhandling.configuration.SourceTestChannels; +import org.axonframework.extensions.springcloud.eventhandling.converter.DefaultSpringMessageConverter; +import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageConverter; +import org.axonframework.extensions.springcloud.eventhandling.utils.TestSerializer; +import org.axonframework.messaging.Headers; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.List; +import java.util.function.Consumer; + +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.*; + +/** + * Test class validating the {@link SpringStreamMessageSource}. + * + * @author Mehdi Chitforoosh + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = MessageSourceTestConfiguration.class) +@DirtiesContext +class SpringStreamMessageSourceTest { + + @Autowired + private SourceTestChannels testChannels; + + private SpringMessageConverter converter; + + private SpringStreamMessageSource testSubject; + + @BeforeEach + void setUp() { + converter = DefaultSpringMessageConverter.builder() + .serializer(TestSerializer.secureXStreamSerializer()) + .build(); + testSubject = SpringStreamMessageSource.builder() + .converter(converter) + .build(); + testChannels.input().subscribe(testSubject); + } + + @AfterEach + void clean() { + testChannels.input().unsubscribe(testSubject); + } + + @Test + void testMessageListenerInvokesAllEventProcessors() { + Consumer>> eventProcessor = mock(Consumer.class); + String payload = "testMessage"; + EventMessage eventMessage = GenericEventMessage.asEventMessage(payload); + + testSubject.subscribe(eventProcessor); + Message message = converter.createSpringMessage(eventMessage); + + testChannels.output().send(message); + + verify(eventProcessor).accept(argThat(item -> item.size() == 1 && item.get(0).getPayload().equals(payload))); + } + + @Test + void testMessageListenerIgnoredOnUnsupportedMessageType() { + Consumer>> eventProcessor = mock(Consumer.class); + String payload = "testMessage"; + EventMessage eventMessage = GenericEventMessage.asEventMessage(payload); + + testSubject.subscribe(eventProcessor); + Message message = converter.createSpringMessage(eventMessage); + + Message messageWithoutHeader = MessageBuilder.fromMessage(message) + .removeHeader(Headers.MESSAGE_TYPE) + .build(); + + testChannels.output().send(messageWithoutHeader); + + verify(eventProcessor, never()).accept(any(List.class)); + } + + @Test + void testMessageListenerInvokedOnUnknownSerializedType() { + Consumer>> eventProcessor = mock(Consumer.class); + String payload = "testMessage"; + EventMessage eventMessage = GenericEventMessage.asEventMessage(payload); + + testSubject.subscribe(eventProcessor); + Message message = converter.createSpringMessage(eventMessage); + + Message messageWithoutHeader = MessageBuilder.fromMessage(message) + .setHeader(Headers.MESSAGE_TYPE, "unknown") + .build(); + + testChannels.output().send(messageWithoutHeader); + + verify(eventProcessor).accept(any(List.class)); + } +} diff --git a/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/MessageProcessorTestConfiguration.java b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/MessageProcessorTestConfiguration.java new file mode 100644 index 0000000..f7e67ae --- /dev/null +++ b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/MessageProcessorTestConfiguration.java @@ -0,0 +1,18 @@ +package org.axonframework.extensions.springcloud.eventhandling.configuration; + +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.integration.annotation.Transformer; + + +@SpringBootApplication +@EnableBinding(Processor.class) +public class MessageProcessorTestConfiguration { + + @Transformer(inputChannel = Processor.OUTPUT, outputChannel = Processor.INPUT) + public Object transformAsIs(Object test) { + return test; + } + +} diff --git a/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/MessagePublisherTestConfiguration.java b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/MessagePublisherTestConfiguration.java new file mode 100644 index 0000000..64dc30a --- /dev/null +++ b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/MessagePublisherTestConfiguration.java @@ -0,0 +1,27 @@ +package org.axonframework.extensions.springcloud.eventhandling.configuration; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.annotation.Transformer; + +@SpringBootApplication +@EnableBinding(PublisherTestChannels.class) +public class MessagePublisherTestConfiguration { + + // To convert Java 8 Instant Date/Time in event message to Json in Spring message + @Bean + public ObjectMapper objectMapper() { + ObjectMapper mapper = new ObjectMapper() + .registerModule(new JavaTimeModule()); + return mapper; + } + + @Transformer(inputChannel = PublisherTestChannels.INPUT, outputChannel = PublisherTestChannels.OUTPUT) + public Object transformAsIs(Object test) { + return test; + } + +} diff --git a/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/MessageSourceTestConfiguration.java b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/MessageSourceTestConfiguration.java new file mode 100644 index 0000000..16d68b7 --- /dev/null +++ b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/MessageSourceTestConfiguration.java @@ -0,0 +1,16 @@ +package org.axonframework.extensions.springcloud.eventhandling.configuration; + +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.integration.annotation.Transformer; + +@SpringBootApplication +@EnableBinding(SourceTestChannels.class) +public class MessageSourceTestConfiguration { + + @Transformer(inputChannel = SourceTestChannels.OUTPUT, outputChannel = SourceTestChannels.INPUT) + public Object transformAsIs(Object test) { + return test; + } + +} diff --git a/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/PublisherTestChannels.java b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/PublisherTestChannels.java new file mode 100644 index 0000000..3d5a79f --- /dev/null +++ b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/PublisherTestChannels.java @@ -0,0 +1,19 @@ +package org.axonframework.extensions.springcloud.eventhandling.configuration; + +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.SubscribableChannel; + +public interface PublisherTestChannels { + + String OUTPUT = "publisherOutputChannel"; + + @Output(OUTPUT) + MessageChannel output(); + + String INPUT = "publisherInputChannel"; + + @Input(INPUT) + SubscribableChannel input(); +} diff --git a/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/SourceTestChannels.java b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/SourceTestChannels.java new file mode 100644 index 0000000..8244483 --- /dev/null +++ b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/SourceTestChannels.java @@ -0,0 +1,19 @@ +package org.axonframework.extensions.springcloud.eventhandling.configuration; + +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.SubscribableChannel; + +public interface SourceTestChannels { + + String OUTPUT = "sourceOutputChannel"; + + @Output(OUTPUT) + MessageChannel output(); + + String INPUT = "sourceInputChannel"; + + @Input(INPUT) + SubscribableChannel input(); +} diff --git a/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageConverterTest.java b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageConverterTest.java new file mode 100644 index 0000000..0488134 --- /dev/null +++ b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/converter/SpringMessageConverterTest.java @@ -0,0 +1,190 @@ +/* + * Copyright (c) 2010-2021. Axon Framework + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.axonframework.extensions.springcloud.eventhandling.converter; + +import org.axonframework.eventhandling.DomainEventMessage; +import org.axonframework.eventhandling.EventMessage; +import org.axonframework.eventhandling.GenericDomainEventMessage; +import org.axonframework.eventhandling.GenericEventMessage; +import org.axonframework.extensions.springcloud.eventhandling.utils.TestSerializer; +import org.axonframework.messaging.Headers; +import org.axonframework.messaging.MetaData; +import org.axonframework.serialization.Serializer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import static org.axonframework.common.DateTimeUtils.formatInstant; +import static org.junit.jupiter.api.Assertions.*; + +/** + * Test class validating the {@link SpringMessageConverter}. + * + * @author Mehdi Chitforoosh + */ +class SpringMessageConverterTest { + + private final Serializer serializer = TestSerializer.secureXStreamSerializer(); + + private SpringMessageConverter testSubject; + + @BeforeEach + void setUp() { + testSubject = DefaultSpringMessageConverter.builder() + .serializer(serializer) + .build(); + } + + @Test + void testSpringMessageIfIsNull() { + assertFalse(testSubject.readSpringMessage(null).isPresent()); + } + + @Test + void testEventMessageIfNotAxonMessageIdPresent() { + String payload = "testMessage"; + EventMessage eventMessage = GenericEventMessage.asEventMessage(payload) + .withMetaData(MetaData.with("key", "value")); + + Message message = testSubject.createSpringMessage(eventMessage); + Message messageWithoutHeader = MessageBuilder.fromMessage(message) + .removeHeader(Headers.MESSAGE_ID) + .build(); + + assertFalse(testSubject.readSpringMessage(messageWithoutHeader).isPresent()); + } + + @Test + void testEventMessageIfNotAxonMessageTypePresent() { + String payload = "testMessage"; + EventMessage eventMessage = GenericEventMessage.asEventMessage(payload) + .withMetaData(MetaData.with("key", "value")); + + Message message = testSubject.createSpringMessage(eventMessage); + Message messageWithoutHeader = MessageBuilder.fromMessage(message) + .removeHeader(Headers.MESSAGE_TYPE) + .build(); + + assertFalse(testSubject.readSpringMessage(messageWithoutHeader).isPresent()); + } + + @Test + void testPayloadAfterReadingSpringMessage() { + String payload = "testMessage"; + EventMessage eventMessage = GenericEventMessage.asEventMessage(payload) + .withMetaData(MetaData.with("key", "value")); + + Message message = testSubject.createSpringMessage(eventMessage); + EventMessage actualEventMessageResult = testSubject.readSpringMessage(message) + .orElseThrow(() -> new AssertionError("Expected valid message")); + + assertTrue(actualEventMessageResult.getPayload() instanceof String); + assertEquals(eventMessage.getPayload(), actualEventMessageResult.getPayload()); + + // Test domain event message + GenericDomainEventMessage domainEventMessage = + new GenericDomainEventMessage<>(payload.getClass().getName(), "testId", 1L, payload, MetaData.with("key", "value")); + + Message messageContainsDomainEventMessage = testSubject.createSpringMessage(domainEventMessage); + EventMessage actualDomainEventMessageResult = testSubject.readSpringMessage(messageContainsDomainEventMessage) + .orElseThrow(() -> new AssertionError("Expected valid message")); + + assertTrue(actualDomainEventMessageResult.getPayload() instanceof String); + assertEquals(domainEventMessage.getPayload(), actualDomainEventMessageResult.getPayload()); + } + + @Test + void testHeadersAndTimestampAfterReadingSpringMessage() { + // Test event message headers + String payload = "testMessage"; + EventMessage eventMessage = GenericEventMessage.asEventMessage(payload) + .withMetaData(MetaData.with("key", "value")); + + Message message = testSubject.createSpringMessage(eventMessage); + EventMessage actualEventMessageResult = testSubject.readSpringMessage(message) + .orElseThrow(() -> new AssertionError("Expected valid message")); + + assertEquals(eventMessage.getMetaData(), actualEventMessageResult.getMetaData()); + assertEquals(eventMessage.getTimestamp(), actualEventMessageResult.getTimestamp()); + + // Test domain event message headers + + GenericDomainEventMessage domainEventMessage = + new GenericDomainEventMessage<>(payload.getClass().getName(), "testId", 1L, payload, MetaData.with("key", "value")); + + Message messageContainsDomainEventMessage = testSubject.createSpringMessage(domainEventMessage); + EventMessage actualDomainEventMessageResult = testSubject.readSpringMessage(messageContainsDomainEventMessage) + .orElseThrow(() -> new AssertionError("Expected valid message")); + + assertEquals(domainEventMessage.getMetaData(), actualDomainEventMessageResult.getMetaData()); + assertEquals(domainEventMessage.getTimestamp(), actualDomainEventMessageResult.getTimestamp()); + } + + @Test + void testNullEventMessage() { + assertThrows(NullPointerException.class, () -> testSubject.createSpringMessage(null)); + } + + @Test + void testNullEventMessagePayload() { + EventMessage eventMessage = GenericEventMessage.asEventMessage(null); + + assertThrows(NullPointerException.class, () -> testSubject.createSpringMessage(eventMessage)); + } + + @Test + void testHeadersAfterCreatingSpringMessage() { + // Test event message headers + String payload = "testMessage"; + EventMessage eventMessage = GenericEventMessage.asEventMessage(payload) + .withMetaData(MetaData.with("key", "value")); + + Message message = testSubject.createSpringMessage(eventMessage); + + assertEquals(eventMessage.getIdentifier(), message.getHeaders().get(Headers.MESSAGE_ID)); + assertEquals(serializer.serialize(payload, payload.getClass()).getType().getName(), message.getHeaders().get(Headers.MESSAGE_TYPE)); + assertEquals(formatInstant(eventMessage.getTimestamp()), message.getHeaders().get(Headers.MESSAGE_TIMESTAMP)); + assertEquals("value", message.getHeaders().get(Headers.MESSAGE_METADATA + "-" + "key")); + + // Test domain event message headers + GenericDomainEventMessage domainEventMessage = + new GenericDomainEventMessage<>(payload.getClass().getName(), "testId", 1L, payload, MetaData.with("key", "value")); + + Message messageContainsDomainEventMessage = testSubject.createSpringMessage(domainEventMessage); + + assertEquals("value", messageContainsDomainEventMessage.getHeaders().get(Headers.MESSAGE_METADATA + "-" + "key")); + assertEquals(domainEventMessage.getIdentifier(), messageContainsDomainEventMessage.getHeaders().get(Headers.MESSAGE_ID)); + assertEquals(serializer.serialize(payload, payload.getClass()).getType().getName(), messageContainsDomainEventMessage.getHeaders().get(Headers.MESSAGE_TYPE)); + assertEquals(formatInstant(domainEventMessage.getTimestamp()), messageContainsDomainEventMessage.getHeaders().get(Headers.MESSAGE_TIMESTAMP)); + assertEquals(((DomainEventMessage) domainEventMessage).getAggregateIdentifier(), messageContainsDomainEventMessage.getHeaders().get(Headers.AGGREGATE_ID)); + assertEquals(((DomainEventMessage) domainEventMessage).getSequenceNumber(), messageContainsDomainEventMessage.getHeaders().get(Headers.AGGREGATE_SEQ)); + assertEquals(((DomainEventMessage) domainEventMessage).getType(), messageContainsDomainEventMessage.getHeaders().get(Headers.AGGREGATE_TYPE)); + } + + @Test + void testPayloadAfterCreatingSpringMessage() { + String payload = "testMessage"; + EventMessage eventMessage = GenericEventMessage.asEventMessage(payload); + + Message message = testSubject.createSpringMessage(eventMessage); + + assertTrue(message.getPayload() instanceof String); + + assertEquals(serializer.serialize(payload, payload.getClass()).getData(), message.getPayload()); + } +} diff --git a/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/utils/TestSerializer.java b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/utils/TestSerializer.java new file mode 100644 index 0000000..27135b7 --- /dev/null +++ b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/utils/TestSerializer.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2010-2021. Axon Framework + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.axonframework.extensions.springcloud.eventhandling.utils; + +import com.thoughtworks.xstream.XStream; +import org.axonframework.serialization.xml.XStreamSerializer; + +/** + * Utility providing {@link org.axonframework.serialization.Serializer} instances for testing. + * + * @author Steven van Beelen + */ +public class TestSerializer { + + private TestSerializer() { + // Test utility class + } + + /** + * Return a {@link XStreamSerializer} for which the security settings have been set. + * + * @return a {@link XStreamSerializer} for which the security settings have been set. + */ + public static XStreamSerializer secureXStreamSerializer() { + XStream xStream = new XStream(); + xStream.setClassLoader(TestSerializer.class.getClassLoader()); + xStream.allowTypesByWildcard(new String[]{"org.axonframework.**"}); + XStream.setupDefaultSecurity(xStream); + return XStreamSerializer.builder().xStream(xStream).build(); + } +} From 4fc97983ac95023c2d328291a03252a9b96aec5a Mon Sep 17 00:00:00 2001 From: Mehdi Chitforoosh Date: Sun, 18 Apr 2021 18:29:52 +0430 Subject: [PATCH 7/9] Edit pom.xml --- pom.xml | 1 + springcloud-streams/pom.xml | 24 +++++++++++++++++++++--- springcloud/pom.xml | 15 ++------------- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/pom.xml b/pom.xml index 085b714..24b108e 100644 --- a/pom.xml +++ b/pom.xml @@ -53,6 +53,7 @@ 4.4.7 Hoxton.SR10 + Horsham.SR11 5.3.4 5.3.4 diff --git a/springcloud-streams/pom.xml b/springcloud-streams/pom.xml index efafcde..ec71d96 100644 --- a/springcloud-streams/pom.xml +++ b/springcloud-streams/pom.xml @@ -50,13 +50,31 @@ spring-integration-core ${spring-integration.version} + + org.springframework.cloud + spring-cloud-stream + + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + true + - junit - junit - 4.11 + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${jackson.version} test + + + org.springframework.cloud + spring-cloud-stream-test-support + test + + diff --git a/springcloud/pom.xml b/springcloud/pom.xml index 21a9c88..03e7eef 100644 --- a/springcloud/pom.xml +++ b/springcloud/pom.xml @@ -15,7 +15,8 @@ ~ limitations under the License. --> - + 4.0.0 org.axonframework.extensions.springcloud @@ -66,18 +67,6 @@ true - - org.springframework - spring-messaging - ${spring-messaging.version} - - - - org.springframework.integration - spring-integration-core - ${spring-integration.version} - - com.fasterxml.jackson.core jackson-databind From ce7914eada202bff4792ae41a1b0a06f60b1d46c Mon Sep 17 00:00:00 2001 From: Mehdi Chitforoosh Date: Wed, 21 Apr 2021 09:57:08 +0430 Subject: [PATCH 8/9] Add an integration test with Spring Integration configurations. --- ...sageSourceAndPublisherIntegrationTest.java | 69 +++++++++++++++++++ ...geSourceAndPublisherTestConfiguration.java | 69 +++++++++++++++++++ .../SourceAndPublisherTestChannels.java | 19 +++++ 3 files changed, 157 insertions(+) create mode 100644 springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageSourceAndPublisherIntegrationTest.java create mode 100644 springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/MessageSourceAndPublisherTestConfiguration.java create mode 100644 springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/SourceAndPublisherTestChannels.java diff --git a/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageSourceAndPublisherIntegrationTest.java b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageSourceAndPublisherIntegrationTest.java new file mode 100644 index 0000000..34c681b --- /dev/null +++ b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/SpringStreamMessageSourceAndPublisherIntegrationTest.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2010-2021. Axon Framework + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.axonframework.extensions.springcloud.eventhandling; + +import org.axonframework.eventhandling.EventBus; +import org.axonframework.eventhandling.EventMessage; +import org.axonframework.eventhandling.GenericEventMessage; +import org.axonframework.extensions.springcloud.eventhandling.configuration.MessageSourceAndPublisherTestConfiguration; +import org.axonframework.messaging.MetaData; +import org.junit.jupiter.api.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.List; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * @author Mehdi Chitforoosh + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = MessageSourceAndPublisherTestConfiguration.class) +@DirtiesContext +class SpringStreamMessageSourceAndPublisherIntegrationTest { + + @Autowired + private EventBus eventBus; + + @Autowired + private SpringStreamMessageSource messageSource; + + @Test + void testMessageListener() { + String payload = "testMessage"; + EventMessage eventMessage = GenericEventMessage.asEventMessage(payload) + .withMetaData(MetaData.with("key", "value")); + + Consumer>> eventProcessor = receivedEventMessages -> { + EventMessage em = receivedEventMessages.get(0); + assertEquals(payload, em.getPayload()); + assertEquals(eventMessage.getIdentifier(), em.getIdentifier()); + assertEquals(eventMessage.getTimestamp(), em.getTimestamp()); + assertEquals("value", em.getMetaData().get("key")); + }; + + messageSource.subscribe(eventProcessor); + + eventBus.publish(eventMessage); + } + +} diff --git a/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/MessageSourceAndPublisherTestConfiguration.java b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/MessageSourceAndPublisherTestConfiguration.java new file mode 100644 index 0000000..3bfee34 --- /dev/null +++ b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/MessageSourceAndPublisherTestConfiguration.java @@ -0,0 +1,69 @@ +package org.axonframework.extensions.springcloud.eventhandling.configuration; + +import org.axonframework.eventhandling.EventBus; +import org.axonframework.eventhandling.SimpleEventBus; +import org.axonframework.extensions.springcloud.eventhandling.SpringStreamMessagePublisher; +import org.axonframework.extensions.springcloud.eventhandling.SpringStreamMessageSource; +import org.axonframework.extensions.springcloud.eventhandling.converter.DefaultSpringMessageConverter; +import org.axonframework.extensions.springcloud.eventhandling.converter.SpringMessageConverter; +import org.axonframework.extensions.springcloud.eventhandling.utils.TestSerializer; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.annotation.Transformer; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.IntegrationFlows; + + +@SpringBootApplication +@EnableBinding(SourceAndPublisherTestChannels.class) +public class MessageSourceAndPublisherTestConfiguration { + + @Bean + public EventBus eventBus() { + return SimpleEventBus.builder() + .build(); + } + + @Bean + public SpringMessageConverter converter() { + return DefaultSpringMessageConverter.builder() + .serializer(TestSerializer.secureXStreamSerializer()) + .build(); + } + + @Bean + public SpringStreamMessageSource source(EventBus eventBus, SpringMessageConverter converter) { + return SpringStreamMessageSource.builder() + .converter(converter) + .build(); + } + + @Bean + public SpringStreamMessagePublisher publisher(EventBus eventBus, SpringMessageConverter converter) { + return SpringStreamMessagePublisher.builder() + .messageSource(eventBus) + .converter(converter) + .build(); + } + + @Bean + public IntegrationFlow publisherFlow(SpringStreamMessagePublisher publisher) { + return IntegrationFlows.from(publisher) + .channel(SourceAndPublisherTestChannels.OUTPUT) + .get(); + } + + @Bean + public IntegrationFlow sourceFlow(SpringStreamMessageSource source) { + return IntegrationFlows.from(SourceAndPublisherTestChannels.INPUT) + .handle(source) + .get(); + } + + @Transformer(inputChannel = SourceAndPublisherTestChannels.OUTPUT, outputChannel = SourceAndPublisherTestChannels.INPUT) + public Object transformAsIs(Object test) { + return test; + } + +} diff --git a/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/SourceAndPublisherTestChannels.java b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/SourceAndPublisherTestChannels.java new file mode 100644 index 0000000..e1d3396 --- /dev/null +++ b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/SourceAndPublisherTestChannels.java @@ -0,0 +1,19 @@ +package org.axonframework.extensions.springcloud.eventhandling.configuration; + +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.SubscribableChannel; + +public interface SourceAndPublisherTestChannels { + + String OUTPUT = "sourceAndPublisherOutputChannel"; + + @Output(OUTPUT) + MessageChannel output(); + + String INPUT = "sourceAndPublisherInputChannel"; + + @Input(INPUT) + SubscribableChannel input(); +} From d5e865e2d6389949080c24ec531b411cbc54512c Mon Sep 17 00:00:00 2001 From: Mehdi Chitforoosh Date: Wed, 21 Apr 2021 10:00:41 +0430 Subject: [PATCH 9/9] Add comments --- .../MessageSourceAndPublisherTestConfiguration.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/MessageSourceAndPublisherTestConfiguration.java b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/MessageSourceAndPublisherTestConfiguration.java index 3bfee34..f41efde 100644 --- a/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/MessageSourceAndPublisherTestConfiguration.java +++ b/springcloud-streams/src/test/java/org/axonframework/extensions/springcloud/eventhandling/configuration/MessageSourceAndPublisherTestConfiguration.java @@ -35,6 +35,8 @@ public SpringMessageConverter converter() { @Bean public SpringStreamMessageSource source(EventBus eventBus, SpringMessageConverter converter) { return SpringStreamMessageSource.builder() + // Optional +// .eventBus(eventBus) .converter(converter) .build(); }