2
votes

So, I'm working on a PoC for a low latency trading engine using axon and Spring Boot framework. Is it possible to achieve latency as low as 10 - 50ms for a single process flow? The process will include validations, orders, and risk management. I have done some initial tests on a simple app to update the order state and execute it and I'm clocking in 300ms+ in latency. Which got me curious as to how much can I optimize with Axon?

Edit:
The latency issue isn't related to Axon. Managed to get it down to ~5ms per process flow using an InMemoryEventStorageEngine and DisruptorCommandBus.

The flow of messages goes like this. NewOrderCommand(published from client) -> OrderCreated(published from aggregate) -> ExecuteOrder(published from saga) -> OrderExecutionRequested -> ConfirmOrderExecution(published from saga) -> OrderExecuted(published from aggregate)

Edit 2: Finally switched over to Axon Server but as expected the average latency went up to ~150ms. Axon Server was installed using Docker. How do I optimize the application using AxonServer to achieve sub-millisecond latencies moving forward? Any pointers are appreciated.

Edit 3: @Steven, based on your suggestions I have managed to bring down the latency to an average of 10ms, this is a good start ! However, is it possible to bring it down even further? As what I am testing now is just a small process out of a series of processes to be done like validations, risk management and position tracking before finally executing the order out. All of which should be done within 5ms or less. Worse case to tolerate is 10ms(These are the updated time budget). Also, do note below in the configs that the new readings are based on an InMemorySagaStore backed by a WeakReferenceCache. Really appreciate the help !

OrderAggregate:

@Aggregate
internal class OrderAggregate {
    @AggregateIdentifier(routingKey = "orderId")
    private lateinit var clientOrderId: String
    private var orderId: String = UUID.randomUUID().toString()
    private lateinit var state: OrderState
    private lateinit var createdAtSource: LocalTime

    private val log by Logger()

    constructor() {}

    @CommandHandler
    constructor(command: NewOrderCommand) {
        log.info("received new order command")
        val (orderId, created) = command
        apply(
                OrderCreatedEvent(
                        clientOrderId = orderId,
                        created = created
                )
        )
    }

    @CommandHandler
    fun handle(command: ConfirmOrderExecutionCommand) {
        apply(OrderExecutedEvent(orderId = command.orderId, accountId = accountId))
    }

    @CommandHandler
    fun execute(command: ExecuteOrderCommand) {
        log.info("execute order event received")
        apply(
                OrderExecutionRequestedEvent(
                        clientOrderId = clientOrderId
                )
        )
    }

    @EventSourcingHandler
    fun on(event: OrderCreatedEvent) {
        log.info("order created event received")
        clientOrderId = event.clientOrderId
        createdAtSource = event.created
        setState(Confirmed)
    }

    @EventSourcingHandler
    fun on(event: OrderExecutedEvent) {
        val now = LocalTime.now()
        log.info(
                "elapse to execute: ${
                    createdAtSource.until(
                            now,
                            MILLIS
                    )
                }ms. created at source: $createdAtSource, now: $now"
        )
        setState(Executed)
    }

    private fun setState(state: OrderState) {
        this.state = state
    }
}

OrderManagerSaga:

@Profile("rabbit-executor")
@Saga(sagaStore = "sagaStore")
class OrderManagerSaga {
    @Autowired
    private lateinit var commandGateway: CommandGateway

    @Autowired
    private lateinit var executor: RabbitMarketOrderExecutor
    private val log by Logger()

    @StartSaga
    @SagaEventHandler(associationProperty = "clientOrderId")
    fun on(event: OrderCreatedEvent) {
        log.info("saga received order created event")
        commandGateway.send<Any>(ExecuteOrderCommand(orderId = event.clientOrderId, accountId = event.accountId))
    }

    @SagaEventHandler(associationProperty = "clientOrderId")
    fun on(event: OrderExecutionRequestedEvent) {
        log.info("saga received order execution requested event")
        try {
            //execute order
            commandGateway.send<Any>(ConfirmOrderExecutionCommand(orderId = event.clientOrderId))
        } catch (e: Exception) {
            log.error("failed to send order: $e")
            commandGateway.send<Any>(
                    RejectOrderCommand(
                            orderId = event.clientOrderId
                    )
            )
        }
    }
}

Beans:

@Bean
fun eventSerializer(mapper: ObjectMapper): JacksonSerializer{
    return JacksonSerializer.Builder()
            .objectMapper(mapper)
            .build()
}

@Bean
fun commandBusCache(): Cache {
    return WeakReferenceCache()
}

@Bean
fun sagaCache(): Cache {
    return WeakReferenceCache()
}

   
@Bean
fun associationsCache(): Cache {       
    return WeakReferenceCache()
}

@Bean
fun sagaStore(sagaCache: Cache, associationsCache: Cache): CachingSagaStore<Any>{    
    val sagaStore = InMemorySagaStore()
    return CachingSagaStore.Builder<Any>()
            .delegateSagaStore(sagaStore)
            .associationsCache(associationsCache)
            .sagaCache(sagaCache)
            .build()
}

@Bean
fun commandBus(
        commandBusCache: Cache,
        orderAggregateFactory: SpringPrototypeAggregateFactory<Order>,
        eventStore: EventStore,
        txManager: TransactionManager,
        axonConfiguration: AxonConfiguration,
        snapshotter: SpringAggregateSnapshotter
): DisruptorCommandBus {  
    val commandBus = DisruptorCommandBus.builder()
            .waitStrategy(BusySpinWaitStrategy())
            .executor(Executors.newFixedThreadPool(8))
            .publisherThreadCount(1)
            .invokerThreadCount(1)
            .transactionManager(txManager)
            .cache(commandBusCache)
            .messageMonitor(axonConfiguration.messageMonitor(DisruptorCommandBus::class.java, "commandBus"))
            .build()
    commandBus.registerHandlerInterceptor(CorrelationDataInterceptor(axonConfiguration.correlationDataProviders()))
    return commandBus
}

Application.yml:

axon:
  server:
    enabled: true
  eventhandling:
    processors:
      name:
        mode: tracking
        source: eventBus
  serializer:
    general : jackson
    events : jackson
    messages : jackson
1
I am confident you can, as any component in Axon Framework can be configured. Furthermore, you can switch the infrastructure components to more optimized parts too. At this stage though, you question is to broad to actually recommend what you can configure. Any specifics on the flow of messages and how your application is modeled would provide further insights here. So, if you could update your question, that would be great.Steven
Hi @Steven, i have updated my questionDavid Teh
Thanks for expanding on the subject David.Steven

1 Answers

3
votes

Your setup's description is thorough, but I think there are still some options I can recommend. This touches a bunch of locations within the Framework, so if anything's unclear on the suggestions given their position or goals within Axon, feel free to add a comment so that I can update my response.

Now, let's provide a list of the things I have in mind:

  • Set up snapshotting for aggregates if loading takes to long. Configurable with the AggregateLoadTimeSnapshotTriggerDefinition.
  • Introduces a cache for your aggregate. I'd start with trying out the WeakReferenceCache. If this doesn't suffice, it would be worth investigating the EhCache and JCache adapters. Or, construct your own. Here's the section on Aggregate caching, by the way.
  • Introduces a cache for your saga. I'd start with trying out the WeakReferenceCache. If this doesn't suffice, it would be worth investigating the EhCache and JCache adapters. Or, construct your own. Here's the section on Saga caching, by the way.
  • Do you really need a Saga in this setup? The process seems simple enough it could run within a regular Event Handling Component. If that's the case, not moving through the Saga flow will likely introduce a speed up too.
  • Have you tried optimizing the DisruptorCommandBus? Try playing with the WaitStrategy, publisher thread count, invoker thread count and the Executor used.
  • Try out the PooledStreamingEventProcessor (PSEP, for short) instead of the TrackingEventProcessor (TEP, for short). The former provides more configuration options. The defaults already provide a higher throughput compared to the TEP, by the way. Increasing the "batch size" allows you to ingest bigger amounts of events in one go. You can also change the Executor the PSEP uses for Event retrieval work (done by the coordinator) and Event processing (the worker executor is in charge of this).
  • There are also some things you can configure on Axon Server that might increase throughput. Try out the event.events-per-segment-prefetch, the event.read-buffer-size or command-thread. There might be other options that work, so it might be worth checking out the entire list of options here.
  • Although it's hard to deduce whether this will generate an immediate benefit, you could give the Axon Server runnable more memory / CPU. At least 2Gb heap and 4 cores. Playing with these numbers might just help too.

There's likely more to share, but these are the things I have on top of mind. Hope this helps you out somewhat David!