0
votes

So I've gotten MQTT -> MQTT and AMQP -> AMQP to work; the translation of MQTT -> AMQP doesn't seem to be working somewhere though. Here's my test, it passes if my "listener" is also in MQTT using paho, but this rabbitmq implementation doesn't.

@SpringBootTest
@SpringJUnitConfig
internal open class ProvisioningTest @Autowired constructor(
    private val mqtt: IMqttAsyncClient,
    private val mapper: ObjectMapper
) {

    @Test
    fun provision() {
        val entity = Foley(
            rfid = UUID.randomUUID().toString(),
        )

        val called = AtomicBoolean(false)
        mqtt.subscribe("foley/created", 1) { _, _ -> called.set(true) }

        mqtt.publish("foley/new", MqttMessage(mapper.writeValueAsBytes(entity)))

        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilTrue(called)
    }
}

this is the listener that publishes the saved entity to the other queue; it never gets called when I publish to MQTT.

@Service
open class Provisioning(private val repo: FoleyRepo) {
    private val log: Logger = LogManager.getLogger(this::class.java)

    @SendTo("foley.created")
    @RabbitListener(queuesToDeclare = [Queue("foley.new")] )
    open fun listen(entity: Foley): Foley {
        log.trace("saving: {}", entity)
        val save = repo.save(entity)
        log.debug("saved: {}", save)
        return save
    }

}

the entirety of my messaging configuration

@Configuration
open class MessagingConfig {

    @Bean
    open fun client(
        @Value("tcp://\${mqtt.client.host:localhost}:\${mqtt.client.port:1883}") uri: String,
        @Value("\${mqtt.client.user:#{null}}") user: String?,
        @Value("\${mqtt.client.pass:#{null}}") pass: CharArray?
    ): IMqttAsyncClient {

        val connOpt = MqttConnectOptions()
        user?.let { connOpt.userName = it }
        pass?.let { connOpt.password = it }
        connOpt.isCleanSession = false
        connOpt.isAutomaticReconnect = true
        val client = MqttAsyncClient(uri, MqttAsyncClient.generateClientId(), MemoryPersistence())
        client.connect(connOpt)
        return client
    }

    @Bean
    open fun messageConverter( om: ObjectMapper): MessageConverter {
        return Jackson2JsonMessageConverter(om)
    }

    @Bean
    open fun builder(): Jackson2ObjectMapperBuilderCustomizer {
        return Jackson2ObjectMapperBuilderCustomizer {
            it.modules(JavaTimeModule(), KotlinModule())
        }
    }
}

using the official docker rabbitmq image with mqtt enabled.

What do I need to correct to make this work?

2

2 Answers

2
votes

The MQTT plugin publishes to the amq.topic with the mqtt topic name as the routing key.

On the consumer side, it binds an auto-delete queue to that exchange, with the routing key; in the following example, the queue is named mqtt-subscription-mqttConsumerqos1.

In order to receive MQTT messages over AMQP, you need to bind your own queue to the exchange. Here is an example:

@SpringBootApplication
public class So54995261Application {

    public static void main(String[] args) {
        SpringApplication.run(So54995261Application.class, args);
    }

    @Bean
    @ServiceActivator(inputChannel = "toMQTT")
    public MqttPahoMessageHandler sendIt(MqttPahoClientFactory clientFactory) {
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler("clientId", clientFactory);
        handler.setAsync(true);
        handler.setDefaultTopic("so54995261");
        return handler;
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://localhost:1883" });
        options.setUserName("guest");
        options.setPassword("guest".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    public MessageProducerSupport mqttInbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("mqttConsumer",
                mqttClientFactory(), "so54995261");
        adapter.setCompletionTimeout(5000);
        return adapter;
    }

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows.from(mqttInbound())
                .handle(System.out::println)
                .get();
    }

    @RabbitListener(queues = "so54995261")
    public void listen(byte[] in) {
        System.out.println(new String(in));
    }

    @Bean
    public Queue queue() {
        return new Queue("so54995261");
    }

    @Bean
    public Binding binding() {
        return new Binding("so54995261", DestinationType.QUEUE, "amq.topic", "so54995261", null);
    }

    @Bean
    public ApplicationRunner runner(MessageChannel toMQTT) {
        return args -> toMQTT.send(new GenericMessage<>("foo"));
    }

}
1
votes

@Gary Russel

The example above is very simple even though it is a bit confusing. The reason is when I first read the example I could not figure out, is "so54995261" mqtt topic or queue binded to exchange which named "amq.topic" or both of them. Because of same naming it confused me.

Verification for what I understand,

When I send mqtt message with let's say mqtt topic that named "a-topic" via any client to rabbitmq, mqtt plugin takes message and publishes to exchange that named "amq.topic" with the mqtt topic that I sent in my case it is "a-topic" as a routing key.

Then, I can create queue let's say "queue-b" and bind "queue-b" to "amq.topic" exchange with "a-topic" binding key

And, rabbitmq will route with the "a-topic" routing key to "queue-b". If I listen "queue-b", I will be able to consume via amqp