2
votes

Hi Axon Framework community,

I'd like to have your opinion on how to solve the following problem properly.

My Axon Test Setup

  • Two instances of the same Spring Boot application (using axon-spring-boot-starter 4.4 without Axon Server)
  • Every instance publishes the same events on a regular interval
  • Both instances are connected to the same EventSource (single SQL Server instance using JpaEventStorageEngine)
  • Every instance is configured to use TrackingEventProcessors
  • Every instances has the same event handlers registered

What I want to achieve

I'd like that events published by one instance are only handled by the very same instance

If instance1 publishes eventX then only instance1 should handle eventX

What I've tried so far

  • I can achieve the above scenario using SubscribingEventProcessor. Unfortunately this is not an option in my case, since we'd like to have the option to replay events for rebuilding / adding new query models.
  • I could assign the event handlers of every instance to differed processing groups. Unfortunately this didn't worked. Maybe because every TrackingEventProcessors instance processes the same EventStream ? - not so sure about this though.
  • I could implement a MessageHandlerInterceptor which only proceeds in case the event origin is from the same instance. This is what I implemented so far and which works properly: MessageHandlerInterceptor
class StackEventInterceptor(private val stackProperties: StackProperties) : MessageHandlerInterceptor<EventMessage<*>> {

    override fun handle(unitOfWork: UnitOfWork<out EventMessage<*>>?, interceptorChain: InterceptorChain?): Any? {
        val stackId = (unitOfWork?.message?.payload as SomeEvent).stackId
        if(stackId == stackProperties.id){
            interceptorChain?.proceed()
        }
        return null
    }
}
@Configuration
class AxonConfiguration {

    @Autowired
    fun configure(eventProcessingConfigurer: EventProcessingConfigurer, stackProperties: StackProperties) {
        val processingGroup = "processing-group-stack-${stackProperties.id}"
        eventProcessingConfigurer.byDefaultAssignTo(processingGroup)
        eventProcessingConfigurer.registerHandlerInterceptor(processingGroup) { StackEventInterceptor(stackProperties) }
    }
}

Is there a better solution ?

I have the impression that my current solution is properly not the best one, since ideally I'd like that only the event handlers which belongs to a certain instance are triggered by the TrackingEventProcessor instances.

How would you solve that ?

2

2 Answers

2
votes

Interesting scenario you're having here @thowimmer. My first hunch would be to say "use the SubscribingEventProcessor instead". However, you pointed out that that's not an option in your setup. I'd argue it's very valuable for others who're in the same scenario to know why that's not an option. So, maybe you can elaborate on that (to be honest, I am curious about that too).

Now for your problem case to ensure events are only handled within the same JVM. Adding the origin to the events is definitely a step you can take, as this allows for a logical way to filter. "Does this event originate from my.origin()?" If not, you'd just ignore the event and be done with it, simple as that. There is another way to achieve this though, to which I'll come to in a bit.

The place to filter is however what you're looking for mostly I think. But first, I'd like to specify why you need to filter in the first place. As you've noticed, the TrackingEventProcessor (TEP) streams events from a so called StreamableMessageSource. The EventStore is an implementation of such a StreamableMessageSource. As you are storing all events in the same store, well, it'll just stream everything to your TEPs. As your events are part of a single Event Stream, you are required to filter them at some stage. Using a MessageHandlerInterceptor would work, you could even go and write a HandlerEnhacnerDefinition allowing you to add additional behaviour to your Event Handling functions. However you put it though, with the current setup, filtering needs to be done somewhere. The MessageHandlerInterceptor is arguably the simplest place to do this at.

However, there is a different way of dealing with this. Why not segregate your Event Store, into two distinct instances for both applications? Apparently they do not have the need to read from one another, so why share the same Event Store at all? Without knowing further background of your domain, I'd guess you are essentially dealing with applications residing in distinct bounded contexts. Very shortly put, there is zero interest to share everything with both applications/contexts, you just share specifics portions of your domain language very consciously with one another.

Note that support for multiple contexts, using a single communication hub in the middle, is exactly what Axon Server can achieve for you. I am not here to say you cant configure this yourself though, I have done this in the past. But leaving that work to somebody or something else, freeing you from the need to configure infrastructure, that would be a massive timesaver.

Hope this helps you set the context a little of my thoughts on the matter @thowimmer.

0
votes

Sumup:

Using the same EventStore for both instances is probably no an ideal setup in case we want to use the capabilities of the TrackingEventProcessor.

Options to solve it:

  • Dedicated (not mirrored) DB instance for each application instance.
  • Using multiple contexts using AxonServer.

If we decide to solve the problem on application level filtering using MessageHandlerInterceptor is the most simplest solution.

Thanks @Steven for exchanging ideas.


EDIT:

Solution on application level using CorrelationDataProvider & MessageHandlerInterceptor by filtering out events not originated in same process.

AxonConfiguration.kt

const val METADATA_KEY_PROCESS_ID = "pid"
const val PROCESSING_GROUP_PREFIX = "processing-group-pid"

@Configuration
class AxonConfiguration {

    @Bean
    fun processIdCorrelationDataProvider() = ProcessIdCorrelationDataProvider()

    @Autowired
    fun configureProcessIdEventHandlerInterceptor(eventProcessingConfigurer: EventProcessingConfigurer) {
        val processingGroup = "$PROCESSING_GROUP_PREFIX-${ApplicationPid()}"
        eventProcessingConfigurer.byDefaultAssignTo(processingGroup)
        eventProcessingConfigurer.registerHandlerInterceptor(processingGroup) { ProcessIdEventHandlerInterceptor() }
    }
}

class ProcessIdCorrelationDataProvider() : CorrelationDataProvider {
    override fun correlationDataFor(message: Message<*>?): MutableMap<String, *> {
        return mutableMapOf(METADATA_KEY_PROCESS_ID to ApplicationPid().toString())
    }
}

class ProcessIdEventHandlerInterceptor : MessageHandlerInterceptor<EventMessage<*>> {
    override fun handle(unitOfWork: UnitOfWork<out EventMessage<*>>?, interceptorChain: InterceptorChain?) {
        val currentPid = ApplicationPid().toString()
        val originPid = unitOfWork?.message?.metaData?.get(METADATA_KEY_PROCESS_ID)
        if(currentPid == originPid){
            interceptorChain?.proceed()
        }
    }
}

See full demo project on GitHub