I am trying to establish a data flow wherein a mosquitto publisher will send data to the kafka broker via MQTT Source Connector and the kafka broker will forward the input data to a MongoDb database via MonoDb Sink Connector. The MQTT Source Connector and MongoDb Source Connector are individually working properly. When I'm trying to integrate both the connectors it is giving me an exception. I've spent loads of hours in finding the solution and have failed in doing so. I need help in establishing this data flow.
mqttPublisher.py
data = {
"time": str(datetime.datetime.now().time()),
"val": 0
}
client.publish("dist", json.dumps(data), qos=2)
kafkaConsumer.py
consumer = KafkaConsumer('mqtt.',
bootstrap_servers='localhost:9092')
for msg in consumer:
print(msg)
source-anonymous.properties
name=MQTT-source
tasks.max=1
connector.class=io.confluent.connect.mqtt.MqttSourceConnector
mqtt.server.uri=tcp://127.0.0.1:1883
mqtt.topics=dist
kafka.topics=mqtt.
MongoDbSinkConnector.properties
name=MyMongoDbSinkConnector
topics=mqtt.
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
connector.class=at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector
mongodb.connection.uri=mongodb://localhost:27017/sample?w=1&journal=true
mongodb.collection=data
mongodb.max.num.retries=3
mongodb.retries.defer.timeout=5000
mongodb.document.id.strategy =at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy
mongodb.post.processor.chain =at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder
mongodb.delete.on.null.values=false
mongodb.writemodel.strategy =at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy
mongodb.max.batch.size=0
Output (kafkaConsumer.py)
ConsumerRecord(topic='mqtt.', partition=0, offset=0, timestamp=1545759406558, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x08dist', value=b'\x00\x00\x00\x00\x02J{"time": "23:06:46.548284", "val": 0}', headers=[('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b'false')], checksum=None, serialized_key_size=10, serialized_value_size=43, serialized_header_size=62)
CLI command for starting Connectors:
bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties share/confluent-hub-components/confluentinc-kafka-connect-mqtt/etc/source-anonymous.properties share/confluent-hub-components/hpgrahsl-kafka-connect-mongodb/etc/MongoDbSinkConnector.properties
Logs
[2018-12-25 23:07:52,280] INFO Created connector MQTT-source (org.apache.kafka.connect.cli.ConnectStandalone:104)
[2018-12-25 23:07:52,346] INFO Connecting to Mqtt Server. (io.confluent.connect.mqtt.MqttSourceTask:67)
[2018-12-25 23:07:52,371] INFO Subscribing to dist with QOS of 0 (io.confluent.connect.mqtt.MqttSourceTask:76)
[2018-12-25 23:07:52,380] INFO WorkerSourceTask{id=MQTT-source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:199)
.
.
.
.
[2018-12-25 23:07:52,615] INFO Creating connector MyMongoDbSinkConnector of type at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector (org.apache.kafka.connect.runtime.Worker:235)
[2018-12-25 23:07:52,616] INFO Instantiated connector MyMongoDbSinkConnector with version 1.2.0 of type class at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector (org.apache.kafka.connect.runtime.Worker:238)
[2018-12-25 23:07:52,616] INFO Finished creating connector MyMongoDbSinkConnector (org.apache.kafka.connect.runtime.Worker:257)
.
.
.
[2018-12-25 23:07:52,706] INFO Created connector MyMongoDbSinkConnector (org.apache.kafka.connect.cli.ConnectStandalone:104)
[2018-12-25 23:07:52,708] INFO starting MongoDB sink task (at.grahsl.kafka.connect.mongodb.MongoDbSinkTask:78)
.
.
[2018-12-25 23:07:52,943] INFO Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500} (org.mongodb.driver.cluster:71)
[2018-12-25 23:07:53,013] INFO WorkerSinkTask{id=MyMongoDbSinkConnector-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:302)
[2018-12-25 23:07:53,037] INFO Cluster ID: VX_AdknXRGGfEWsSdcSpSw (org.apache.kafka.clients.Metadata:285)
[2018-12-25 23:07:53,057] INFO Opened connection [connectionId{localValue:1, serverValue:14}] to localhost:27017 (org.mongodb.driver.connection:71)
[2018-12-25 23:07:53,063] INFO Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 6, 4]}, minWireVersion=0, maxWireVersion=6, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=3937492} (org.mongodb.driver.cluster:71)
[2018-12-25 23:07:53,869] INFO [Consumer clientId=consumer-1, groupId=connect-MyMongoDbSinkConnector] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
[2018-12-25 23:07:53,871] INFO [Consumer clientId=consumer-1, groupId=connect-MyMongoDbSinkConnector] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:472)
[2018-12-25 23:07:53,871] INFO [Consumer clientId=consumer-1, groupId=connect-MyMongoDbSinkConnector] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509)
[2018-12-25 23:07:53,976] INFO [Consumer clientId=consumer-1, groupId=connect-MyMongoDbSinkConnector] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:473)
[2018-12-25 23:07:53,980] INFO [Consumer clientId=consumer-1, groupId=connect-MyMongoDbSinkConnector] Setting newly assigned partitions [mqtt.-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)
[2018-12-25 23:07:53,991] INFO [Consumer clientId=consumer-1, groupId=connect-MyMongoDbSinkConnector] Resetting offset for partition mqtt.-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:583)
[2018-12-25 23:07:54,189] ERROR WorkerSinkTask{id=MyMongoDbSinkConnector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted.
(org.apache.kafka.connect.runtime.WorkerSinkTask:584)
org.bson.json.JsonParseException: JSON reader was expecting a value but found 'dist'.
at org.bson.json.JsonReader.readBsonType(JsonReader.java:251)
at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:680)
at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:722)
at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:450)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
at org.bson.BsonDocument.parse(BsonDocument.java:62)
at at.grahsl.kafka.connect.mongodb.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:32)
at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.convert(SinkConverter.java:44)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$buildWriteModel$3(MongoDbSinkTask.java:186)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModel(MongoDbSinkTask.java:185)
at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:122)
.
.
MongoDb Sink Connector : https://github.com/hpgrahsl/kafka-connect-mongodb
Kafka-Connect MQTT Source Connector Demo : https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example/blob/master/live-demo-kafka-connect-iot-mqtt-connector.adoc
UPDATE:
Following is the output I'm getting on the kafka-avro-console-consumer:
"dist" "J{\"time\": \"23:06:46.548284\", \"val\": 0}"
cli command used :
bin/kafka-avro-console-consumer --topic mqtt. --bootstrap-server localhost:9092 --property print.key=true
Schema registered on the schema-registry:
{"subject":"mqtt.-key","version":1,"id":1,"schema":"\"string\""}
{"subject":"mqtt.-value","version":1,"id":2,"schema":"\"bytes\""}
command used to fetch schemas :
curl --silent -X GET http://localhost:8081/subjects/mqtt.-[key|value]/versions/latest
BsonOidStrategy
? – OneCricketeerBsonDocument.parse
, so some Bson data is getting generated incorrectly – OneCricketeer