I am trying to use Lenses MQTT source connector [https://docs.lenses.io/connectors/source/mqtt.html] with confluent kafka v5.4.
Following is my MQTT source connector properties file:
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
connect.mqtt.clean=false
key.converter.schemas.enable=false
connect.mqtt.timeout=1000
value.converter.schemas.enable=false
name=kmd-source-4
connect.mqtt.kcql=INSERT INTO kafka-source-topic-2 SELECT * FROM ctt/+/+/location WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter` WITHKEY(id)
value.converter=org.apache.kafka.connect.json.JsonConverter
connect.mqtt.service.quality=1
key.converter=org.apache.kafka.connect.json.JsonConverter
connect.mqtt.hosts=tcp://ip:1883
connect.mqtt.converter.throw.on.error=true
connect.mqtt.username=username
connect.mqtt.password=password
errors.log.include.messages=true
errors.log.enable=true
I am publishing messages from UI based MQTT client MQTT fx to MQTT topic 'ctt/+/+/location' and subscribing those messages on the kafka topic 'kafka-source-topic-2'.I am using Rabbit MQ as my MQTT broker and my confluent platform and RabbitMQ are on different VMs. I do not think using RabbitMQ broker instead of Mosquitto MQTT should be a problem. Whatever and whenever I publish from MQTT fx I successfully see the messages in the MQTT fx upon subscription. I had also set up confleunt MongoDB source connector and it works seamlessly.
But my problem is - the messages published on MQTT topic are available on the mapped kafka topic in an intermittent manner. What could be the reason? I do not see any error messages in kafka connect logs. Are there any connection related properties with respect to MQTT broker that I need to specify in my MQTT source properties file? Are there any properties to be included for sure in Rabbit MQ broker? Has anyone used Lenses MQTT source and sink connectors and would like to suggest anything about them?