My Quarkus microservice is using the AMQP connector in the smallrye reactive messaging library to produce messages to the ActiveMQ Artemis broker running from the vromero/activemq-artemis:2.16.0-alpine
Docker image. The reactive messaging library documentation mentions the possibility to use dynamic address names. I'm using following (Kotlin) code in my REST resource:
@Inject
@Channel("task-finished")
lateinit var taskFinishedEmitter: MutinyEmitter<String>
@POST
@Produces(MediaType.TEXT_PLAIN)
fun doSomethingAndInform(@RestForm customerId: String): Uni<String> {
// leaving out the actual messageText computation...
val messageText: String = "DUMMY MESSAGE"
val metadata: OutgoingAmqpMetadata = OutgoingAmqpMetadata.builder()
.withDurable(true)
.withCorrelationId(customerId)
.withAddress("anycast://my-custom-address")
.build()
val message: Message<String> = Message.of(messageText,
{
logger.info("message acked")
CompletableFuture.completedFuture(null)
},
{
logger.info("message nacked: {}", it.message)
CompletableFuture.completedFuture(null)
}
)
taskFinishedEmitter.send(message.addMetadata(metadata))
return Uni.createFrom().item("DONE")
}
The connector is defined in the application.properties
:
amqp-host=localhost
amqp-port=5672
amqp-username=adm
amqp-password=***
mp.messaging.outgoing.task-finished.connector=smallrye-amqp
ActiveMQ Artemis indeed creates the my-custom-address
address dynamically however it doesn't create any queue bound to it and the message ends up being unrouted.
The broker.xml
config file contains in the core
section
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true;anycastPrefix=anycast://</acceptor>
<address-setting match="#">
<auto-create-queues>true</auto-create-queues>
</address-settings>
I tried passing the queue name along with the address
.withAddress("anycast://my-custom-address::my-queue")
but it didn't make any difference.
What is missing for the queue being created programmatically and the message being delivered to it? Also why does Artemis acknowledge the message when it gets lost (unrouted)?
Update: attaching a screenshot from the Artemis web interface