0
votes

I'm using Axon Framework (4.1) with aggregates in one module (JVM, container) and projections/Sagas in another module. What I want to do is to have a distributed application taking advantage of CQRS but without Event Sourcing.

It is rather trivial to setup and everything works as expected in a single application. The problem arises when there are several independent modules (across separate JVMs) involved. Out of the box Axon starter uses tracking processors connected to AxonServerEventStore, which allows to have "location transparency" when it comes to listening to the events across different JVMs.

In my case, I don't want any infrastructure for persisting or tracking the events. I just want to distribute the events to any subscribing processors (SEPs) from my aggregates in a fire-and-forget style, just like AxonServerQueryBus is doing to distribute scatter-gather queries, for example.

If I just declare all processors as subscribing as follows:

    @Autowired
    public void configureEventSubscribers(EventProcessingConfigurer configurer) {
        configurer.usingSubscribingEventProcessors();
    }

events are reaching all @EventHandler methods in the same JVM, but events are not reaching any handlers in other JVMs anymore. If my understanding is correct, then, Axon Server will distribute the events across JVMs for tracking processors only (TEPs).

Obviously, what I can do, is to use an external message broker (RabbitMQ, Kafka) in combination with SpringAMQPMessageSource (as in the docs) to distribute events to all subscribers through something like fanout in RabbitMQ. This works, but this requires to maintain the broker myself.

What would be nice is to have Axon Server taking care of this just like it takes care of distributing commands and queries (this would give me one less infrastructure piece to care about).

As a side note, I've actually managed to distribute the events to projections using QueryBus and passing events as payloads to GenericQueryMessage sent as scatter-gather queries. Needless to say, this is not a robust solution. But it goes to demonstrate that there is nothing inherently impossible with Axon Server distributing events (just another type of a message, after all) to SEPs or TEPs indifferently.

Finally, the questions:

1) What is the community's recommendation for pure CQRS (without Event Sourcing) using Axon when it comes to location transparency and distributing the events?

2) Is it possible to make Axon Server to distribute events to SEPs across JVMs (eliminating the need for an external message broker)?

2

2 Answers

2
votes

Note on Event Sourcing

From Axon Framework's perspective, Event Sourcing is a sole concern of your Command Model. This stance is taken, as Event Sourcing defines the recreation of a model through the events it has published. A Query Model however does not react to commands with publishing events changing its state, it simply listen to (distributed) events to update its state to be queried by others. As such, the framework only thinks about Event Sourcing when it recreates your Aggregates, by providing the EventSourcingRepository.

The Event Processor's job is to be the "mechanical aspect of providing events to your Event Handlers". This relates to the Q part in CQRS, to recreating the Query Model. Thus, the Framework does not regard Event Processors to be part of the notion of Event Sourcing.

Answer to your scenario

I do want to emphasize that if you are distributing your application by running several instances of a given app, you will very likely need to have a way to ensure a given event is only handled once.

This is one of the concerns a Tracking Event Processor (TEP) addresses, and it does so by using a Tracking Token. The Tracking Token essential acts as a marker defining which events have been processed. Added, a given TEP's thread is inclined to have a claim on a token to be able to work, which thus ensure a given event is not handled twice.

Concluding, you will need to define infrastructure to store Tracking Tokens to be able to distributed the event load, essentially opting against the use of the SubscribingEventProcessor entirely.

However, whether the above is an issu does depend on your application landscape. Maybe you aren't duplicating a given application at all, thus effectively not duplicating a given Tracking Event Processor. In this case, you can fulfill your request to "not track events", whilst still using Tracking Event Processors. All you have to do, is to ensure you are not storing them. The interface used to storing tokens, is the TokenStore, for which an in memory version exists.

Using the InMemoryTokenStore in a default Axon set up will however mean you'll technically be replaying your events every time. This occurs due to the default "initial Tracking Token" process. This is, of course, also configurable, for which I'd suggest you to use the following approach:

// Creating the configuration for a TEP
TrackingEventProcessorConfiguration tepConfig = 
    TrackingEventProcessorConfiguration
        .forSingleThreadedProcessing() // Note: could also be multi-threaded
        .andInitialTrackingToken(StreamableMessageSource::createHeadToken);

// Registering as default TEP config
EventProcessingConfigurer.
    registerTrackingEventProcessorConfiguration(config -> tepConfig);    

This should set you up to use TEP, without the necessity to set up infrastructure to store Tokens. Note however, this will require you not to duplicate the given application.


I'd like to end with the following question you've posted:

Is it possible to make Axon Server to distribute events to SEPs across JVMs (eliminating the need for an external message broker)?

As you have correctly noted, SEPs are (currently) only usable for subscribing to events which have been published within a given JVM. Axon Server does not (yet) have a mechanism to bridge events from one JVM to another for the purpose allowing distributed Subscribing Event Processing. I am (as part of AxonIQ) however relatively sure we will look in to this in the future. If such a feature is of importance to successful conclusion of your project, I suggest to contact AxonIQ directly.

0
votes

If you are considering Apache Kafka for this use case, you might want to look into kalium.alkal.io. It will make your code to be much simpler

MyObject myObject = ....
kalium.post(myObject); //this is used to send POJOs/protobuffs using Kafka Producer
//On the consumer side. This will use a deserializer with the Kafka Consumer API
kalium.on(MyObject.class, myObject -> {
   // do something with the object
}, "consumer_group");