0
votes

I'm using confluent platform 5.3.1 and I have defined two different mqtt source connectors in distributed mode using:

curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
"name" : "mqtt-source1",
"config" : {
"connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max" : "1",
"mqtt.server.uri" : "<IP-ADDRESS 1>",
"mqtt.topics" : "<TOPIC MQTT 1>",
"kafka.topics" : "mqtt1",
"value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable" : "false",
"value.converter.schemas.enable" : "false",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license":""
}
}'

and

curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
"name" : "mqtt-source2",
"config" : {
"connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max" : "1",
"mqtt.server.uri" : "<IP-ADDRESS 2>",
"mqtt.topics" : "<TOPIC MQTT 2>",
"kafka.topics" : "mqtt2",
"value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable" : "false",
"value.converter.schemas.enable" : "false",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license":""
}
}'

I have some questions:

1) Looking at the status of the connectors I get the same result for both the connectors (following i post an examples of the response):

{
"name": "mqtt-source1",
"connector": {
  "state": "RUNNING",
  "worker_id": "127.0.0.1:8083"
},
"tasks": [
  {
   "id": 0,
   "state": "RUNNING",
   "worker_id": "127.0.0.1:8083"
 }
],
  "type": "source"
}

2) when I create the first connector, the topic "mqtt" is automatically created on kafka. Personally, I'm going to create two different topics on kafka (i.e., "mqtt1" and "mqtt2") as set in the two connectors but I can't read any data from the topics I created. Why? The IP and the mqtt topic are different in the two connectors.

Thanks in advance.

1
Please clarify question 1 - it looks more like a statement to me. If it's a question can you explain what your question or concern is?Robin Moffatt
yes, I'm sorry; I forgot to post the question. I was asking if you could set two different connectors within the same worker. I'm reading the documentation and it seems possible.Andrea Calvaresi

1 Answers

0
votes

The configuration property is kafka.topic not kafka.topics.

Since you've specified kafka.topics the connector is taking the default, which is mqtt.

Ref: Configuration Properties