1
votes

I am trying to use Lenses MQTT source connector [https://docs.lenses.io/connectors/source/mqtt.html] with confluent kafka v5.4.

Following is my MQTT source connector properties file:

connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
connect.mqtt.clean=false
key.converter.schemas.enable=false
connect.mqtt.timeout=1000
value.converter.schemas.enable=false
name=kmd-source-4
connect.mqtt.kcql=INSERT INTO kafka-source-topic-2   SELECT * FROM ctt/+/+/location WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter` WITHKEY(id)
value.converter=org.apache.kafka.connect.json.JsonConverter
connect.mqtt.service.quality=1
key.converter=org.apache.kafka.connect.json.JsonConverter
connect.mqtt.hosts=tcp://ip:1883
connect.mqtt.converter.throw.on.error=true
connect.mqtt.username=username
connect.mqtt.password=password
errors.log.include.messages=true
errors.log.enable=true

I am publishing messages from UI based MQTT client MQTT fx to MQTT topic 'ctt/+/+/location' and subscribing those messages on the kafka topic 'kafka-source-topic-2'.I am using Rabbit MQ as my MQTT broker and my confluent platform and RabbitMQ are on different VMs. I do not think using RabbitMQ broker instead of Mosquitto MQTT should be a problem. Whatever and whenever I publish from MQTT fx I successfully see the messages in the MQTT fx upon subscription. I had also set up confleunt MongoDB source connector and it works seamlessly.

But my problem is - the messages published on MQTT topic are available on the mapped kafka topic in an intermittent manner. What could be the reason? I do not see any error messages in kafka connect logs. Are there any connection related properties with respect to MQTT broker that I need to specify in my MQTT source properties file? Are there any properties to be included for sure in Rabbit MQ broker? Has anyone used Lenses MQTT source and sink connectors and would like to suggest anything about them?

1

1 Answers

1
votes

Your connect.mqtt.timeout is only 1 second?!? Intermittent messages suggests to me that your connector is timing out and has to re-establish its connection, and while its busy doing that, MQTT messages are coming in but not making it to the connector as it is not subscribed to the broker at that instance. Try increasing your timeout to something like 60000 (1 minute) and see what happens. Is there any reason you need it to timeout? RabbitMQ can handle connections that stay open for long periods of time with no traffic.