3
votes

I am following https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example for connecting Mosquitto and Kafka with MQTT source connector. I am getting the data sent by the Mosquitto Publisher into the Mosquitto Subscriber and the Kafka Consumer. But the key and value field in my ConsumerRecord object of kafka-consumer is having some prepended byte characters. Below are the code snippets and the outputs I'm getting.

mqttPublisher.py

while v3 < 3:
             data3 = {
                      "time": str(datetime.datetime.now().time()),
                       "val": v3
                      }
             client.publish("sensor/dist", json.dumps(data3), qos=2)

             v3 += 1
             time.sleep(2)

mqttSubscriber.py

def on_message_print(client, userdata, message):
            print(message.topic,message.payload)

subscribe.callback(on_message_print, "sensor/#", hostname="localhost")

kafkaConsumer.py

consumer = KafkaConsumer('mqtt.',
                     bootstrap_servers=['localhost:9092'])

for message in consumer:
   print(message)

Output:mqttSubscriber.py

sensor/dist b'{"time": "12:44:30.817462", "val": 0}'

sensor/dist b'{"time": "12:44:32.820040", "val": 1}'

sensor/dist b'{"time": "12:44:34.822657", "val": 2}'

Output : kafkaConsumer.py

ConsumerRecord(topic='mqtt.', partition=0, offset=225, timestamp=1545117270870, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist', value=b'\x00\x00\x00\x00\x02J{"time": "12:44:30.817462", "val": 0}', headers=[('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b'false')], checksum=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)

ConsumerRecord(topic='mqtt.', partition=0, offset=226, timestamp=1545117272821, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist', value=b'\x00\x00\x00\x00\x02J{"time": "12:44:32.820040", "val": 1}', headers=[('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b'false')], checksum=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)

ConsumerRecord(topic='mqtt.', partition=0, offset=227, timestamp=1545117274824, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist', value=b'\x00\x00\x00\x00\x02J{"time": "12:44:34.822657", "val": 2}', headers=[('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b'false')], checksum=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)

What is causing the above prepending of extra bytes in the Kafka Consumer? Thanks in advance.

1

1 Answers

1
votes

As part of the demo, you're starting a Schema Registry

Start Kafka Connect and dependencies (Kafka, Zookeeper, Schema Registry):

confluent start connect

If you look at the first 5 bytes, you'll see they start with 0, then four more bytes representing an integer.

See the Schema Registry Wire Format and try doing a curl localhost:8081/subjects to see if it lists your topic name for mqtt-key and mqtt-value.

If you didn't want Avro, you would need to configure and edit your Kafka Connect property file to use different Converters, and not use confluent start other than getting Kafka and Zookeeper running

Or if you want Python to deserialize the Avro, you can refer to the confluent-kafka-python repo on Github