Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish events to Spring Cloud Stream channels #5

Open
wants to merge 12 commits into
base: master
Choose a base branch
from

Conversation

mehdichitforoosh
Copy link

We can integrate Axon EventBus with Spring Integration and Spring Cloud Stream with these configurations.
As a Message Producer:

    @Bean
    public IntegrationFlow flow(EventBus eventBus) {
        return IntegrationFlows.from(new AxonInboundChannelAdapter(eventBus))
                .channel(Source.OUTPUT)
                .get();
    }

As a Consumer:

 @Bean
  public IntegrationFlow flow(EventBus eventBus) {
      return IntegrationFlows.from(Sink.INPUT)
              .handle(new AxonOutboundChannelAdapter(eventBus))
              .get();
  }

  @Autowired
  public void configure(EventProcessingConfigurer config) {
      config.usingSubscribingEventProcessors();
  }

Sink.INPUT and Source.OUTPUT are MessageChannels from Spring Cloud Stream.

@mehdichitforoosh mehdichitforoosh changed the title publish events to spring cloud stream Publish events to spring cloud stream Apr 12, 2019
@mehdichitforoosh mehdichitforoosh changed the title Publish events to spring cloud stream Publish events to Spring Cloud Stream channels Apr 12, 2019
@smcvb smcvb requested review from abuijze and smcvb May 13, 2019 12:03
Copy link
Member

@idugalic idugalic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially this can be valuable extension.
I'm not sure if this is the right place for it. Maybe, it can be in a dedicated Github Repo.

It would be great if we could try other binders, like Kafka for example. I'm afraid that Spring Cloud Stream is not allowing to programmatically set the seek/offset at the moment. This implies that we will not be able to utilize Axon StreamableMessageSource and Tracking processors to scale better.

@smcvb
Copy link
Member

smcvb commented Jan 15, 2021

@mehdichitforoosh, are you up for adjusting your PR towards a different format?
As it stands, I think there's value in having an option to connect to Spring Cloud Streams for events.
However, I don't think this should be contained inside the module you've dropped it in right now.

In essence, this extension has been set up to tap into Spring Cloud Discovery for Command Message distribution.
What you are providing is a solution to tap into Spring Cloud Stream for Event Message distribution.

That would thus opt for the current code to be moved to an axon-springcloud-discovery module, whereas your code would reside in an axon-springcloud-streams module.
If you want to take it to that end, please let us know.
In absence of response, which might be imaginable after such a long timeslot, we'll likely pick up your code and do this ourselves eventually.

@mehdichitforoosh
Copy link
Author

mehdichitforoosh commented Mar 8, 2021

Hi @smcvb.
Sorry for my late reply.
Yes,I want to complete this PR.
What changes should I make?

@smcvb
Copy link
Member

smcvb commented Mar 10, 2021

That's amazing to hear @mehdichitforoosh!
As first stabs to revamp this PR, I'd like you to do the following:

  1. Add a distinct module to this project, dedicated to this solution. Calling the package springcloud-streams and the artifact axon-springcloud-streams would clarify this is different from the existing Spring Cloud Discovery implementation.
  2. Repoint this PR towards the most recent version of the extension. This might be some effort, true, but it would be most reasonable to think of releasing this with a recent version of the extension. Pointing your PR to master and merging it with the current changes in master should be sufficient to achieve this.

@mehdichitforoosh
Copy link
Author

mehdichitforoosh commented Mar 10, 2021

Ok dear Steven @smcvb
Well, I will definitely do it and I will inform you in the next few days.
Thanks.

@CLAassistant
Copy link

CLAassistant commented Mar 19, 2021

CLA assistant check
All committers have signed the CLA.

@mehdichitforoosh mehdichitforoosh changed the base branch from axon-springcloud-4.1.x to master March 19, 2021 08:24
@mehdichitforoosh
Copy link
Author

dear @smcvb
I added changes.
What should I do for the next step?

@smcvb
Copy link
Member

smcvb commented Mar 24, 2021

Thanks for the effort so far @mehdichitforoosh, much appreciated.
For now, I'd like to ask you to do the following things:

  • Add descriptive class-level Javadoc to any of the classes you have added. When doing so, don't forget to adjust the @author (with your preferred name, of course) and @since (pointing towards 4.5, for now) tags correctly.
  • Using "...Axon..." in any of the class names shouldn't be necessary I think. As long as the implementation describes the intent of that class in the translation from Spring-Streams to Axon's idea of Events, Event Processors and Message Sources, we should be good. The AxonProcessorMessageHandler to me for example sounds like a SpringStreamsMessageSource.
  • Any of the copyright notices should stretch towards 2021.
  • A lot of the classes seem to be duplicated right now. Make sure only a single instance of each is present.
  • I am missing unit test classes for the provided code. We definitely need these prior to approve this PR.
  • For components like these, a short sample implementation through an integration test would be highly beneficial. This would ensure the process works as desired, but can also serve as a sample to others who want to use this extension.

Likely there are more fine-grained comments to go over too, but let's first take a stab at the above.
Again, thanks for the effort so far @mehdichitforoosh!
If your time doesn't allow to proceed with this, I think there might be an option that we'll (at AxonIQ) take it over.
Just let us know whether you prefer that.

@mehdichitforoosh
Copy link
Author

Thanks @smcvb for your attention.
I will finish this PR soon :-)

@mehdichitforoosh
Copy link
Author

mehdichitforoosh commented Apr 18, 2021

Hi Steven @smcvb
I think this PR is completed.

  1. I added Javadoc
  2. Rename and refactor all classes
  3. Add copyright texts
  4. Removed duplicate classes
  5. Unit test classes with Junit5, Mockito and Spring cloud stream test support
    Thanks for your patience.
    Tell me if there is a problem in latest commits.
    Good luck.

@mehdichitforoosh
Copy link
Author

mehdichitforoosh commented May 9, 2021

Hi @smcvb @idugalic @abuijze .
Did you check the latest commits?
Please Tell me if there is a problem with the code.
Thank you.

@idugalic idugalic self-requested a review May 9, 2021 19:08
this.messageSource = builder.messageSource;
this.converter = builder.converter;
if (builder.eventBus != null) {
this.messageProcessors.addAll(singletonList(builder.eventBus::publish));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should add EventBus by default here? @smcvb what do you think? In my opinion, we could leave this decision to the users of the API, but I am afraid that in that case, we are missing something like MultiSubscribableMessageSource.java allowing them to subscribe to multiple sources: eventBus/eventStore, streamSource, ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, I think this is a bad idea.
As you point @idugalic, we should have a form of MultiSubscribableMessageSource.

Simply publishing the Spring Messages as new events on the local EventBus is very likely going to cause a lot of issues.
Let alone that it leaks the concerns of another service's Events into the domain connecting to it.

@idugalic
Copy link
Member

idugalic commented May 9, 2021

Hi @mehdichitforoosh. Thank you for this PR. Much appreciated! Conceptually this looks very good!

It is important to note that this extension module is supporting SubscribableMessageSource only. The subscribable stream leaves all the ordering specifics in the hands of brokers (Kafka, for example), which means the events should be published on a consistent partition to ensure ordering. I am totally fine with this, and this is not a blocker or stopper in my opinion. To control events of a certain group to be placed in a dedicated partition, based on an aggregate identifier, for example, the message converter's SequencingPolicy can be utilized. StreamableMessageSource could be considered, but it is not something I would push now (let's let the brokers do the job).

@mehdichitforoosh
Copy link
Author

Hi @idugalic Thank you for your comments.

Copy link
Member

@smcvb smcvb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, I will not proceed with the integration tests yet.
Got a bunch of comments here, lots about JavaDoc. My main concerns are the usage of the EventBus in the message source directly and the seeming duplication with the SpringStreamMessageProcessor and SpringStreamMessageSource

And, I shouldn't forget to extend my apologies for the late review.
Times have been hectic here @mehdichiforoosh, and sadly there weren't a lot of people we could spare on the other subjects we're working on.
I hope this review doesn't come too late for you..`

pom.xml Outdated

<spring.version>5.3.4</spring.version>
<spring-messaging.version>5.3.4</spring-messaging.version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an actual need to have a separate spring-messaging and spring-integration property, or could we tag along with spring.version?

@@ -0,0 +1,100 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2010-2020. Axon Framework
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The notice should extend to 2021, as this commit is from 2021.

* @author Mehdi chitforoosh
* @since 4.5
*/
public class SpringStreamMessageProcessor extends AbstractMessageProducingHandler implements SubscribableMessageSource<EventMessage<?>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we call it the SpringStreamMessageSource instead of the SpringStreamMessageProcessor?

Calling it a processor to closely aligns with Axon's Event Processor logic.
Hence, I assume it will confuse users.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I notice there already is a SpringStreamMessageSource. On top of that, it also extends the AbstractMessageProducingHandler class and implements the SubscribableMessageSource.
Would you mind explaining why both exist in this pull request?

import java.util.Optional;

/**
* Interface describing a mechanism that converts Spring Messages from an Axon Messages and vice versa.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Interface describing a mechanism that converts Spring Messages from an Axon Messages and vice versa.
* Interface describing a mechanism that converts a Spring {@code org.springframework.messaging.Message} from an Axon {@link org.axonframework.eventhandling.EventMessage} and vice versa.

public interface SpringMessageConverter {

/**
* Creates an Spring Message from given {@code eventMessage}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Creates an Spring Message from given {@code eventMessage}.
* Creates a Spring {@code org.springframework.messaging.Message} from the given {@code eventMessage}.

Comment on lines +48 to +58
/**
* Instantiate a {@link SpringStreamMessageSource} based on the fields contained in the {@link SpringStreamMessageSource.Builder}.
* The {@link EventBus} is a <b>hard requirement</b> and thus should be provided.
* The {@link SpringMessageConverter} is a <b>hard requirement</b> and thus should be provided.
* <p>
* Will validate that the {@link EventBus} and {@link SpringMessageConverter} are not {@code null}, and will throw an
* {@link AxonConfigurationException} if for either of them this holds.
*
* @param builder the {@link SpringStreamMessageSource.Builder} used to instantiate a {@link SpringStreamMessageSource} instance
*/
protected SpringStreamMessageSource(SpringStreamMessageSource.Builder builder) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the builder is an inner class, you do not have to define the root class in the JavaDoc too:

Suggested change
/**
* Instantiate a {@link SpringStreamMessageSource} based on the fields contained in the {@link SpringStreamMessageSource.Builder}.
* The {@link EventBus} is a <b>hard requirement</b> and thus should be provided.
* The {@link SpringMessageConverter} is a <b>hard requirement</b> and thus should be provided.
* <p>
* Will validate that the {@link EventBus} and {@link SpringMessageConverter} are not {@code null}, and will throw an
* {@link AxonConfigurationException} if for either of them this holds.
*
* @param builder the {@link SpringStreamMessageSource.Builder} used to instantiate a {@link SpringStreamMessageSource} instance
*/
protected SpringStreamMessageSource(SpringStreamMessageSource.Builder builder) {
/**
* Instantiate a {@link SpringStreamMessageSource} based on the fields contained in the {@link Builder}.
* <p>
* Will validate that the {@link SpringMessageConverter} is not {@code null}, and will throw an
* {@link AxonConfigurationException} if for either of them this holds.
*
* @param builder the {@link Builder} used to instantiate a {@link SpringStreamMessageSource} instance
*/
protected SpringStreamMessageSource(Builder builder) {

*/
public class SpringStreamMessageSource extends AbstractMessageProducingHandler implements SubscribableMessageSource<EventMessage<?>> {

private final CopyOnWriteArrayList<Consumer<List<? extends EventMessage<?>>>> messageProcessors = new CopyOnWriteArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to call these eventProcessors as they are consumers of the EventMessage type.

Comment on lines +89 to +90
Optional<EventMessage<?>> optional = converter.readSpringMessage(message);
optional.ifPresent(eventMessage -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Why not chain the outcome of the converter directly with the ifPresent invocation?

* @param eventBus The messageSource to subscribe
* @return the current Builder instance, for fluent interfacing
*/
public SpringStreamMessageSource.Builder eventBus(EventBus eventBus) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove the EventBus from the builder, as described earlier in the constructor.

* @param converter The converter to convert messages
* @return the current Builder instance, for fluent interfacing
*/
public SpringStreamMessageSource.Builder converter(SpringMessageConverter converter) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we're in the Builder here, we do not have to define the root class:

Suggested change
public SpringStreamMessageSource.Builder converter(SpringMessageConverter converter) {
public Builder converter(SpringMessageConverter converter) {

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smcvb No Problem.
Thank you for your reviews.
I'll check your comments and commit again in a few days.
Good luck.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing @mehdichitforoosh! I'll aim to get you an earlier follow-up review this time around 👍

@smcvb smcvb removed the request for review from abuijze October 28, 2021 12:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants