So I've gotten MQTT -> MQTT and AMQP -> AMQP to work; the translation of MQTT -> AMQP doesn't seem to be working somewhere though. Here's my test, it passes if my "listener" is also in MQTT using paho, but this rabbitmq implementation doesn't.
internal open class ProvisioningTest @Autowired constructor(
private val mqtt: IMqttAsyncClient,
private val mapper: ObjectMapper
) {
fun provision() {
val entity = Foley(
rfid = UUID.randomUUID().toString(),
val called = AtomicBoolean(false)
mqtt.subscribe("foley/created", 1) { _, _ -> called.set(true) }
mqtt.publish("foley/new", MqttMessage(mapper.writeValueAsBytes(entity)))
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilTrue(called)
this is the listener that publishes the saved entity to the other queue; it never gets called when I publish to MQTT.
open class Provisioning(private val repo: FoleyRepo) {
private val log: Logger = LogManager.getLogger(
@RabbitListener(queuesToDeclare = [Queue("")] )
open fun listen(entity: Foley): Foley {
log.trace("saving: {}", entity)
val save =
log.debug("saved: {}", save)
return save
the entirety of my messaging configuration
open class MessagingConfig {
open fun client(
@Value("tcp://\${}:\${mqtt.client.port:1883}") uri: String,
@Value("\${mqtt.client.user:#{null}}") user: String?,
@Value("\${mqtt.client.pass:#{null}}") pass: CharArray?
): IMqttAsyncClient {
val connOpt = MqttConnectOptions()
user?.let { connOpt.userName = it }
pass?.let { connOpt.password = it }
connOpt.isCleanSession = false
connOpt.isAutomaticReconnect = true
val client = MqttAsyncClient(uri, MqttAsyncClient.generateClientId(), MemoryPersistence())
return client
open fun messageConverter( om: ObjectMapper): MessageConverter {
return Jackson2JsonMessageConverter(om)
open fun builder(): Jackson2ObjectMapperBuilderCustomizer {
return Jackson2ObjectMapperBuilderCustomizer {
it.modules(JavaTimeModule(), KotlinModule())
using the official docker rabbitmq image with mqtt enabled.
What do I need to correct to make this work?