0
votes

I am trying to establish a data flow wherein a mosquitto publisher will send data to the kafka broker via MQTT Source Connector and the kafka broker will forward the input data to a MongoDb database via MonoDb Sink Connector. The MQTT Source Connector and MongoDb Source Connector are individually working properly. When I'm trying to integrate both the connectors it is giving me an exception. I've spent loads of hours in finding the solution and have failed in doing so. I need help in establishing this data flow.

mqttPublisher.py

data = {
         "time": str(datetime.datetime.now().time()),
          "val": 0
        }

client.publish("dist", json.dumps(data), qos=2)

kafkaConsumer.py

consumer = KafkaConsumer('mqtt.',
                       bootstrap_servers='localhost:9092')
for msg in consumer:
    print(msg)

source-anonymous.properties

name=MQTT-source
tasks.max=1
connector.class=io.confluent.connect.mqtt.MqttSourceConnector
mqtt.server.uri=tcp://127.0.0.1:1883
mqtt.topics=dist
kafka.topics=mqtt.

MongoDbSinkConnector.properties

name=MyMongoDbSinkConnector
topics=mqtt.
tasks.max=1

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
connector.class=at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector


mongodb.connection.uri=mongodb://localhost:27017/sample?w=1&journal=true
mongodb.collection=data
mongodb.max.num.retries=3
mongodb.retries.defer.timeout=5000

mongodb.document.id.strategy =at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy


mongodb.post.processor.chain =at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder

mongodb.delete.on.null.values=false


mongodb.writemodel.strategy =at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy
mongodb.max.batch.size=0

Output (kafkaConsumer.py)

ConsumerRecord(topic='mqtt.', partition=0, offset=0, timestamp=1545759406558, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x08dist', value=b'\x00\x00\x00\x00\x02J{"time": "23:06:46.548284", "val": 0}', headers=[('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b'false')], checksum=None, serialized_key_size=10, serialized_value_size=43, serialized_header_size=62)

CLI command for starting Connectors:

bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties share/confluent-hub-components/confluentinc-kafka-connect-mqtt/etc/source-anonymous.properties share/confluent-hub-components/hpgrahsl-kafka-connect-mongodb/etc/MongoDbSinkConnector.properties

Logs

[2018-12-25 23:07:52,280] INFO Created connector MQTT-source (org.apache.kafka.connect.cli.ConnectStandalone:104)
[2018-12-25 23:07:52,346] INFO Connecting to Mqtt Server. (io.confluent.connect.mqtt.MqttSourceTask:67)
[2018-12-25 23:07:52,371] INFO Subscribing to dist with QOS of 0 (io.confluent.connect.mqtt.MqttSourceTask:76)
[2018-12-25 23:07:52,380] INFO WorkerSourceTask{id=MQTT-source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:199)
.
.
.
.
[2018-12-25 23:07:52,615] INFO Creating connector MyMongoDbSinkConnector of type at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector (org.apache.kafka.connect.runtime.Worker:235)
[2018-12-25 23:07:52,616] INFO Instantiated connector MyMongoDbSinkConnector with version 1.2.0 of type class at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector (org.apache.kafka.connect.runtime.Worker:238)
[2018-12-25 23:07:52,616] INFO Finished creating connector MyMongoDbSinkConnector (org.apache.kafka.connect.runtime.Worker:257)
.
.
.
[2018-12-25 23:07:52,706] INFO Created connector MyMongoDbSinkConnector (org.apache.kafka.connect.cli.ConnectStandalone:104)
[2018-12-25 23:07:52,708] INFO starting MongoDB sink task (at.grahsl.kafka.connect.mongodb.MongoDbSinkTask:78)
.
.
[2018-12-25 23:07:52,943] INFO Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500} (org.mongodb.driver.cluster:71)
[2018-12-25 23:07:53,013] INFO WorkerSinkTask{id=MyMongoDbSinkConnector-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:302)
[2018-12-25 23:07:53,037] INFO Cluster ID: VX_AdknXRGGfEWsSdcSpSw (org.apache.kafka.clients.Metadata:285)
[2018-12-25 23:07:53,057] INFO Opened connection [connectionId{localValue:1, serverValue:14}] to localhost:27017 (org.mongodb.driver.connection:71)
[2018-12-25 23:07:53,063] INFO Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 6, 4]}, minWireVersion=0, maxWireVersion=6, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=3937492} (org.mongodb.driver.cluster:71)
[2018-12-25 23:07:53,869] INFO [Consumer clientId=consumer-1, groupId=connect-MyMongoDbSinkConnector] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
[2018-12-25 23:07:53,871] INFO [Consumer clientId=consumer-1, groupId=connect-MyMongoDbSinkConnector] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:472)
[2018-12-25 23:07:53,871] INFO [Consumer clientId=consumer-1, groupId=connect-MyMongoDbSinkConnector] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509)
[2018-12-25 23:07:53,976] INFO [Consumer clientId=consumer-1, groupId=connect-MyMongoDbSinkConnector] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:473)
[2018-12-25 23:07:53,980] INFO [Consumer clientId=consumer-1, groupId=connect-MyMongoDbSinkConnector] Setting newly assigned partitions [mqtt.-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)
[2018-12-25 23:07:53,991] INFO [Consumer clientId=consumer-1, groupId=connect-MyMongoDbSinkConnector] Resetting offset for partition mqtt.-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:583)
[2018-12-25 23:07:54,189] ERROR WorkerSinkTask{id=MyMongoDbSinkConnector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. 
(org.apache.kafka.connect.runtime.WorkerSinkTask:584)
    org.bson.json.JsonParseException: JSON reader was expecting a value but found 'dist'.
    at org.bson.json.JsonReader.readBsonType(JsonReader.java:251)
    at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:680)
    at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:722)
    at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:450)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
    at org.bson.BsonDocument.parse(BsonDocument.java:62)
    at at.grahsl.kafka.connect.mongodb.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:32)
    at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.convert(SinkConverter.java:44)
    at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$buildWriteModel$3(MongoDbSinkTask.java:186)
    at java.util.ArrayList.forEach(ArrayList.java:1257)
    at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModel(MongoDbSinkTask.java:185)
    at  at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:122)
    .
    .

MongoDb Sink Connector : https://github.com/hpgrahsl/kafka-connect-mongodb

Kafka-Connect MQTT Source Connector Demo : https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example/blob/master/live-demo-kafka-connect-iot-mqtt-connector.adoc

UPDATE:

Following is the output I'm getting on the kafka-avro-console-consumer:

"dist"  "J{\"time\": \"23:06:46.548284\", \"val\": 0}"

cli command used :

bin/kafka-avro-console-consumer --topic mqtt. --bootstrap-server localhost:9092 --property print.key=true

Schema registered on the schema-registry:

{"subject":"mqtt.-key","version":1,"id":1,"schema":"\"string\""}
{"subject":"mqtt.-value","version":1,"id":2,"schema":"\"bytes\""}

command used to fetch schemas :

curl --silent -X GET http://localhost:8081/subjects/mqtt.-[key|value]/versions/latest
1
I suggest trying the console sink connector to debug. But the issue appears to be with the Mongo connector. For example, are you supposed to be using BsonOidStrategy?OneCricketeer
The Mongo sink connector is working properly when I am sending data from a kafka Producer, but here with mosquitto_publisher its giving Exceptions. I am using the default BsonOidStrategy as we can't keep it empty.Shubham
And the "working" version is using Avro for keys and values? My point was only that the stacktrace starts with BsonDocument.parse, so some Bson data is getting generated incorrectlyOneCricketeer
yep I'm using Avro key-value convertersShubham
I changed the converters being used and used an SMT that solved my problemShubham

1 Answers

0
votes

So This worked for me:

  1. Added this SMT to source-anonymous.properties
    transforms=requiredKeyExample transforms.requiredKeyExample.type=io.confluent.connect.transforms.ExtractTopic$Key

  2. Changed key-value converters in connect-standalone.properties as : key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

  3. Changed key-value converters in MongoDbSinkConnector.properties as : key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter