I have a MQTT broker and a Kafka broker running, I have used the kafka-connector: https://github.com/Landoop/stream-reactor, with the next configuration:
name=Mqtt-Source
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=1
connect.mqtt.kcql=INSERT INTO test SELECT * FROM + WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter` WITHKEY(id)
connect.mqtt.connection.clean=true
connect.mqtt.connection.timeout=1000
connect.mqtt.connection.keep.alive=1000
connect.mqtt.client.id=test_mqtt_connector
connect.mqtt.converter.throw.on.error=true
connect.mqtt.hosts=tcp://mqtt-broker:1883
connect.mqtt.service.quality=1
In the kcql, I'm definning the field of the message that kafka should take as key, is it anyway to use the mqtt-topic as key? So I don't need to define the WITHKEY()
in the kcql.