0
votes

My Quarkus microservice is using the AMQP connector in the smallrye reactive messaging library to produce messages to the ActiveMQ Artemis broker running from the vromero/activemq-artemis:2.16.0-alpine Docker image. The reactive messaging library documentation mentions the possibility to use dynamic address names. I'm using following (Kotlin) code in my REST resource:

    @Inject
    @Channel("task-finished")
    lateinit var taskFinishedEmitter: MutinyEmitter<String>

    @POST
    @Produces(MediaType.TEXT_PLAIN)
    fun doSomethingAndInform(@RestForm customerId: String): Uni<String> {

       // leaving out the actual messageText computation...

       val messageText: String = "DUMMY MESSAGE"
       val metadata: OutgoingAmqpMetadata = OutgoingAmqpMetadata.builder()
          .withDurable(true)
          .withCorrelationId(customerId)
          .withAddress("anycast://my-custom-address")
          .build()

       val message: Message<String> = Message.of(messageText,
            {
                logger.info("message acked")
                CompletableFuture.completedFuture(null)
            },
            {
                logger.info("message nacked: {}", it.message)
                CompletableFuture.completedFuture(null)
            }
       )

       taskFinishedEmitter.send(message.addMetadata(metadata))
       return Uni.createFrom().item("DONE")
    }

The connector is defined in the application.properties:

amqp-host=localhost
amqp-port=5672
amqp-username=adm
amqp-password=***

mp.messaging.outgoing.task-finished.connector=smallrye-amqp

ActiveMQ Artemis indeed creates the my-custom-address address dynamically however it doesn't create any queue bound to it and the message ends up being unrouted.

The broker.xml config file contains in the core section


<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true;anycastPrefix=anycast://</acceptor>
<address-setting match="#">
   <auto-create-queues>true</auto-create-queues>
</address-settings>

I tried passing the queue name along with the address

.withAddress("anycast://my-custom-address::my-queue")

but it didn't make any difference.

What is missing for the queue being created programmatically and the message being delivered to it? Also why does Artemis acknowledge the message when it gets lost (unrouted)?

Update: attaching a screenshot from the Artemis web interface enter image description here

1

1 Answers

1
votes

By default ActiveMQ Artemis will treat messages sent by AMQP clients as multicast (i.e. pub/sub). The semantics of multicast dictate that every message published to the broker will be dispatched to every subscriber. However, since there are no subscribers the message is simply discarded (i.e. unrouted).

Since you're prefixing your address with anycast:// you should configure that prefix on the "amqp" acceptor in broker.xml using the anycastPrefix parameter, e.g.:

<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true;anycastPrefix=anycast://</acceptor>

You could also change the default routing types used for auto-created resources, e.g.:

<address-setting match="my-custom-address">
   <default-address-routing-type>ANYCAST</default-address-routing-type>
   <default-queue-routing-type>ANYCAST</default-queue-routing-type>
</address-settings>