I am following https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example for connecting Mosquitto and Kafka with MQTT source connector. I am getting the data sent by the Mosquitto Publisher into the Mosquitto Subscriber and the Kafka Consumer. But the key and value field in my ConsumerRecord object of kafka-consumer is having some prepended byte characters. Below are the code snippets and the outputs I'm getting.
mqttPublisher.py
while v3 < 3:
data3 = {
"time": str(datetime.datetime.now().time()),
"val": v3
}
client.publish("sensor/dist", json.dumps(data3), qos=2)
v3 += 1
time.sleep(2)
mqttSubscriber.py
def on_message_print(client, userdata, message):
print(message.topic,message.payload)
subscribe.callback(on_message_print, "sensor/#", hostname="localhost")
kafkaConsumer.py
consumer = KafkaConsumer('mqtt.',
bootstrap_servers=['localhost:9092'])
for message in consumer:
print(message)
Output:mqttSubscriber.py
sensor/dist b'{"time": "12:44:30.817462", "val": 0}'
sensor/dist b'{"time": "12:44:32.820040", "val": 1}'
sensor/dist b'{"time": "12:44:34.822657", "val": 2}'
Output : kafkaConsumer.py
ConsumerRecord(topic='mqtt.', partition=0, offset=225, timestamp=1545117270870, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist', value=b'\x00\x00\x00\x00\x02J{"time": "12:44:30.817462", "val": 0}', headers=[('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b'false')], checksum=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)
ConsumerRecord(topic='mqtt.', partition=0, offset=226, timestamp=1545117272821, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist', value=b'\x00\x00\x00\x00\x02J{"time": "12:44:32.820040", "val": 1}', headers=[('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b'false')], checksum=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)
ConsumerRecord(topic='mqtt.', partition=0, offset=227, timestamp=1545117274824, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist', value=b'\x00\x00\x00\x00\x02J{"time": "12:44:34.822657", "val": 2}', headers=[('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b'false')], checksum=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)
What is causing the above prepending of extra bytes in the Kafka Consumer? Thanks in advance.