So, I'm working on a PoC for a low latency trading engine using axon and Spring Boot framework. Is it possible to achieve latency as low as 10 - 50ms for a single process flow? The process will include validations, orders, and risk management. I have done some initial tests on a simple app to update the order state and execute it and I'm clocking in 300ms+ in latency. Which got me curious as to how much can I optimize with Axon?
Edit:
The latency issue isn't related to Axon. Managed to get it down to ~5ms per process flow using an InMemoryEventStorageEngine
and DisruptorCommandBus
.
The flow of messages goes like this. NewOrderCommand(published from client) -> OrderCreated(published from aggregate) -> ExecuteOrder(published from saga) -> OrderExecutionRequested -> ConfirmOrderExecution(published from saga) -> OrderExecuted(published from aggregate)
Edit 2: Finally switched over to Axon Server but as expected the average latency went up to ~150ms. Axon Server was installed using Docker. How do I optimize the application using AxonServer to achieve sub-millisecond latencies moving forward? Any pointers are appreciated.
Edit 3:
@Steven, based on your suggestions I have managed to bring down the latency to an average of 10ms, this is a good start ! However, is it possible to bring it down even further? As what I am testing now is just a small process out of a series of processes to be done like validations, risk management and position tracking before finally executing the order out. All of which should be done within 5ms or less. Worse case to tolerate is 10ms(These are the updated time budget). Also, do note below in the configs that the new readings are based on an InMemorySagaStore
backed by a WeakReferenceCache
. Really appreciate the help !
OrderAggregate:
@Aggregate
internal class OrderAggregate {
@AggregateIdentifier(routingKey = "orderId")
private lateinit var clientOrderId: String
private var orderId: String = UUID.randomUUID().toString()
private lateinit var state: OrderState
private lateinit var createdAtSource: LocalTime
private val log by Logger()
constructor() {}
@CommandHandler
constructor(command: NewOrderCommand) {
log.info("received new order command")
val (orderId, created) = command
apply(
OrderCreatedEvent(
clientOrderId = orderId,
created = created
)
)
}
@CommandHandler
fun handle(command: ConfirmOrderExecutionCommand) {
apply(OrderExecutedEvent(orderId = command.orderId, accountId = accountId))
}
@CommandHandler
fun execute(command: ExecuteOrderCommand) {
log.info("execute order event received")
apply(
OrderExecutionRequestedEvent(
clientOrderId = clientOrderId
)
)
}
@EventSourcingHandler
fun on(event: OrderCreatedEvent) {
log.info("order created event received")
clientOrderId = event.clientOrderId
createdAtSource = event.created
setState(Confirmed)
}
@EventSourcingHandler
fun on(event: OrderExecutedEvent) {
val now = LocalTime.now()
log.info(
"elapse to execute: ${
createdAtSource.until(
now,
MILLIS
)
}ms. created at source: $createdAtSource, now: $now"
)
setState(Executed)
}
private fun setState(state: OrderState) {
this.state = state
}
}
OrderManagerSaga:
@Profile("rabbit-executor")
@Saga(sagaStore = "sagaStore")
class OrderManagerSaga {
@Autowired
private lateinit var commandGateway: CommandGateway
@Autowired
private lateinit var executor: RabbitMarketOrderExecutor
private val log by Logger()
@StartSaga
@SagaEventHandler(associationProperty = "clientOrderId")
fun on(event: OrderCreatedEvent) {
log.info("saga received order created event")
commandGateway.send<Any>(ExecuteOrderCommand(orderId = event.clientOrderId, accountId = event.accountId))
}
@SagaEventHandler(associationProperty = "clientOrderId")
fun on(event: OrderExecutionRequestedEvent) {
log.info("saga received order execution requested event")
try {
//execute order
commandGateway.send<Any>(ConfirmOrderExecutionCommand(orderId = event.clientOrderId))
} catch (e: Exception) {
log.error("failed to send order: $e")
commandGateway.send<Any>(
RejectOrderCommand(
orderId = event.clientOrderId
)
)
}
}
}
Beans:
@Bean
fun eventSerializer(mapper: ObjectMapper): JacksonSerializer{
return JacksonSerializer.Builder()
.objectMapper(mapper)
.build()
}
@Bean
fun commandBusCache(): Cache {
return WeakReferenceCache()
}
@Bean
fun sagaCache(): Cache {
return WeakReferenceCache()
}
@Bean
fun associationsCache(): Cache {
return WeakReferenceCache()
}
@Bean
fun sagaStore(sagaCache: Cache, associationsCache: Cache): CachingSagaStore<Any>{
val sagaStore = InMemorySagaStore()
return CachingSagaStore.Builder<Any>()
.delegateSagaStore(sagaStore)
.associationsCache(associationsCache)
.sagaCache(sagaCache)
.build()
}
@Bean
fun commandBus(
commandBusCache: Cache,
orderAggregateFactory: SpringPrototypeAggregateFactory<Order>,
eventStore: EventStore,
txManager: TransactionManager,
axonConfiguration: AxonConfiguration,
snapshotter: SpringAggregateSnapshotter
): DisruptorCommandBus {
val commandBus = DisruptorCommandBus.builder()
.waitStrategy(BusySpinWaitStrategy())
.executor(Executors.newFixedThreadPool(8))
.publisherThreadCount(1)
.invokerThreadCount(1)
.transactionManager(txManager)
.cache(commandBusCache)
.messageMonitor(axonConfiguration.messageMonitor(DisruptorCommandBus::class.java, "commandBus"))
.build()
commandBus.registerHandlerInterceptor(CorrelationDataInterceptor(axonConfiguration.correlationDataProviders()))
return commandBus
}
Application.yml:
axon:
server:
enabled: true
eventhandling:
processors:
name:
mode: tracking
source: eventBus
serializer:
general : jackson
events : jackson
messages : jackson