
So I've gotten MQTT -> MQTT and AMQP -> AMQP to work; the translation of MQTT -> AMQP doesn't seem to be working somewhere though. Here's my test, it passes if my "listener" is also in MQTT using paho, but this rabbitmq implementation doesn't.

internal open class ProvisioningTest @Autowired constructor(
    private val mqtt: IMqttAsyncClient,
    private val mapper: ObjectMapper
) {

    fun provision() {
        val entity = Foley(
            rfid = UUID.randomUUID().toString(),

        val called = AtomicBoolean(false)
        mqtt.subscribe("foley/created", 1) { _, _ -> called.set(true) }

        mqtt.publish("foley/new", MqttMessage(mapper.writeValueAsBytes(entity)))

        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilTrue(called)

this is the listener that publishes the saved entity to the other queue; it never gets called when I publish to MQTT.

open class Provisioning(private val repo: FoleyRepo) {
    private val log: Logger = LogManager.getLogger(this::class.java)

    @RabbitListener(queuesToDeclare = [Queue("foley.new")] )
    open fun listen(entity: Foley): Foley {
        log.trace("saving: {}", entity)
        val save = repo.save(entity)
        log.debug("saved: {}", save)
        return save


the entirety of my messaging configuration

open class MessagingConfig {

    open fun client(
        @Value("tcp://\${mqtt.client.host:localhost}:\${mqtt.client.port:1883}") uri: String,
        @Value("\${mqtt.client.user:#{null}}") user: String?,
        @Value("\${mqtt.client.pass:#{null}}") pass: CharArray?
    ): IMqttAsyncClient {

        val connOpt = MqttConnectOptions()
        user?.let { connOpt.userName = it }
        pass?.let { connOpt.password = it }
        connOpt.isCleanSession = false
        connOpt.isAutomaticReconnect = true
        val client = MqttAsyncClient(uri, MqttAsyncClient.generateClientId(), MemoryPersistence())
        return client

    open fun messageConverter( om: ObjectMapper): MessageConverter {
        return Jackson2JsonMessageConverter(om)

    open fun builder(): Jackson2ObjectMapperBuilderCustomizer {
        return Jackson2ObjectMapperBuilderCustomizer {
            it.modules(JavaTimeModule(), KotlinModule())

using the official docker rabbitmq image with mqtt enabled.

What do I need to correct to make this work?


2 Answers


The MQTT plugin publishes to the amq.topic with the mqtt topic name as the routing key.

On the consumer side, it binds an auto-delete queue to that exchange, with the routing key; in the following example, the queue is named mqtt-subscription-mqttConsumerqos1.

In order to receive MQTT messages over AMQP, you need to bind your own queue to the exchange. Here is an example:

public class So54995261Application {

    public static void main(String[] args) {
        SpringApplication.run(So54995261Application.class, args);

    @ServiceActivator(inputChannel = "toMQTT")
    public MqttPahoMessageHandler sendIt(MqttPahoClientFactory clientFactory) {
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler("clientId", clientFactory);
        return handler;

    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://localhost:1883" });
        return factory;

    public MessageProducerSupport mqttInbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("mqttConsumer",
                mqttClientFactory(), "so54995261");
        return adapter;

    public IntegrationFlow flow() {
        return IntegrationFlows.from(mqttInbound())

    @RabbitListener(queues = "so54995261")
    public void listen(byte[] in) {
        System.out.println(new String(in));

    public Queue queue() {
        return new Queue("so54995261");

    public Binding binding() {
        return new Binding("so54995261", DestinationType.QUEUE, "amq.topic", "so54995261", null);

    public ApplicationRunner runner(MessageChannel toMQTT) {
        return args -> toMQTT.send(new GenericMessage<>("foo"));


@Gary Russel

The example above is very simple even though it is a bit confusing. The reason is when I first read the example I could not figure out, is "so54995261" mqtt topic or queue binded to exchange which named "amq.topic" or both of them. Because of same naming it confused me.

Verification for what I understand,

When I send mqtt message with let's say mqtt topic that named "a-topic" via any client to rabbitmq, mqtt plugin takes message and publishes to exchange that named "amq.topic" with the mqtt topic that I sent in my case it is "a-topic" as a routing key.

Then, I can create queue let's say "queue-b" and bind "queue-b" to "amq.topic" exchange with "a-topic" binding key

And, rabbitmq will route with the "a-topic" routing key to "queue-b". If I listen "queue-b", I will be able to consume via amqp