1
votes

we have Axon application that stores new Order. For each order state change (OrderStateChangedEvent) it plans couple of tasks. The tasks are triggered and proceeded by yet another Saga (TaskSaga - out of scope of the question)

When I delete the projection database, but leave the event store, then run the application again, the events are replayed (what is correct), but the tasks are duplicated.

I suppose this is because the OrderStateChangedEvent triggers new set of ScheduleTaskCommand each time.

Since I'm new in Axon, can't figure out how to avoid this duplication.

Event store running on AxonServer

Spring boot application autoconfigures the axon stuff

Projection database contains the projection tables and the axon tables: token_entry saga_entry association_value_entry

I suppose all the events are replayed because by recreating the database, the Axon tables are gone (hence no record about last applied event)

Am I missing something?

  • should the token_entry/saga_entry/association_value_entry tables be part of the DB for the projection tables on each application node?
  • I thought that the event store might be replayed onto new application node's db any time without changing the event history so I can run as many nodes as I wish. Or I can remove the projection dB any time and run the application, what causes that the events are projected to the fresh db again. Or this is not true?
  • In general, my problem is that one event produces command leading to new events (duplicated) produced. Should I avoid this "chaining" of events to avoid duplication?

THANKS!

Axon configuration:

@Configuration
public class AxonConfig {

    @Bean
    public EventSourcingRepository<ApplicationAggregate> applicationEventSourcingRepository(EventStore eventStore) {
        return EventSourcingRepository.builder(ApplicationAggregate.class)
                        .eventStore(eventStore)
                        .build();
    }

    @Bean
    public SagaStore sagaStore(EntityManager entityManager) {
        return JpaSagaStore.builder().entityManagerProvider(new SimpleEntityManagerProvider(entityManager)).build();
    }
}
  1. CreateOrderCommand received by Order aggregate (method fromCommand just maps 1:1 command to event)
    @CommandHandler
    public OrderAggregate(CreateOrderCommand cmd) {
        apply(OrderCreatedEvent.fromCommand(cmd))
                .andThenApply(() -> OrderStateChangedEvent.builder()
                        .applicationId(cmd.getOrderId())
                        .newState(OrderState.NEW)
                        .build());
    }
  1. Order aggregate sets the properties
    @EventSourcingHandler
    protected void on(OrderCreatedEvent event) {
        id = event.getOrderId();

        // ... additional properties set

    }

    @EventSourcingHandler
    protected void on(OrderStateChangedEvent cmd) {
        this.state = cmd.getNewState();
    }
  1. OrderStateChangedEvent is listened by Saga that schedules couple of tasks for the order of the particular state
    private Map<String, TaskStatus> tasks = new HashMap<>();

    private OrderState orderState;

    @StartSaga
    @SagaEventHandler(associationProperty = "orderId")
    public void on(OrderStateChangedEvent event) {
        orderState = event.getNewState();
        List<OrderStateAwareTaskDefinition> tasksByState = taskService.getTasksByState(orderState);
        if (tasksByState.isEmpty()) {
            finishSaga(event.getOrderId());
        }

        tasksByState.stream()
                .map(task -> ScheduleTaskCommand.builder()
                        .orderId(event.getOrderId())
                        .taskId(IdentifierFactory.getInstance().generateIdentifier())
                        .targetState(orderState)
                        .taskName(task.getTaskName())
                        .build())
                .peek(command -> tasks.put(command.getTaskId(), SCHEDULED))
                .forEach(command -> commandGateway.send(command));
    }
2
Did any of the answer resolve your problem @Tomáš Mika? Would be beneficial to other readers if you can share your course of action or if the answer resolved your problem.Steven

2 Answers

2
votes

I think I can help you in this situation. So, this happens because the TrackingToken used by the TrackingEventProcessor which supplies all the events to your Saga instances is initialized to the beginning of the event stream. Due to this the TrackingEventProcessor will start from the beginning of time, thus getting all your commands dispatched for a second time.

There are a couple of things you could do to resolve this.

  1. You could, instead of wiping the entire database, only wipe the projection tables and leave the token table intact.
  2. You could configure the initialTrackingToken of a TrackingEventProcessor to start at the head of the event stream instead of the tail.

Option 1 would work out find, but requires some delegation from the operations perspective. Option 2 leaves it in the hands of a developer, potentially a little safer than the other solution.

To adjust the token to start at the head, you can instantiate a TrackingEventProcessor with a TrackingEventProcessorConfiguration:

    EventProcessingConfigurer configurer;

    TrackingEventProcessorConfiguration trackingProcessorConfig =
            TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
                                               .andInitialTrackingToken(StreamableMessageSource::createHeadToken);

    configurer.registerTrackingEventProcessor("{class-name-of-saga}Processor", 
                                              Configuration::eventStore, 
                                              c -> trackingProcessorConfig);

You'd thus create the desired configuration for your Saga and call the andInitialTrackingToken() function and ensuring the creation of a head token of no token is present.

I hope this helps you out Tomáš!

1
votes

Steven's solution works like a charm but only in Sagas. For those who want to achieve the same effect but in classic @EventHandler (to skip executions on replay) there is a way. First you have to find out how your tracking event processor is named - I found it in AxonDashboard (8024 port on running AxonServer) - usually it is location of a component with @EventHandler annotation (package name to be precise). Then add configuration as Steven indicated in his answer.

    @Autowired
    public void customConfig(EventProcessingConfigurer configurer) {
        // This prevents from replaying some events in @EventHandler
        var trackingProcessorConfig = TrackingEventProcessorConfiguration
                .forSingleThreadedProcessing()
                .andInitialTrackingToken(StreamableMessageSource::createHeadToken);
        configurer.registerTrackingEventProcessor("com.domain.notreplayable",
                org.axonframework.config.Configuration::eventStore,
                c -> trackingProcessorConfig);
    }