6
votes

We are running Kafka Connect (Confluent Platform 5.4, ie. Kafka 2.4) in a distributed mode using Debezium (MongoDB) and Confluent S3 connectors. When adding a new connector via the REST API the connector is created in RUNNING state, but no tasks are created for the connector.

Pausing and resuming the connector does not help. When we stop all workers and then start them again, the tasks are created and everything runs as it should.

The issue is not caused by the connector plugins, because we see the same behaviour for both Debezium and S3 connectors. Also in debug logs I can see that Debezium is correctly returning a task configuration from the Connector.taskConfigs() method.

Can somebody tell me what to do se we can add connectors without restarting the workers? Thanks.

Configuration details

The cluster has 3 nodes with the following connect-distributed.properties:

bootstrap.servers=kafka-broker-001:9092,kafka-broker-002:9092,kafka-broker-003:9092,kafka-broker-004:9092
group.id=tdp-QA-connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.topic=connect-offsets-qa
offset.storage.replication.factor=3
offset.storage.partitions=5

config.storage.topic=connect-configs-qa
config.storage.replication.factor=3

status.storage.topic=connect-status-qa
status.storage.replication.factor=3
status.storage.partitions=3

offset.flush.interval.ms=10000

rest.host.name=tdp-QA-kafka-connect-001
rest.port=10083
rest.advertised.host.name=tdp-QA-kafka-connect-001
rest.advertised.port=10083

plugin.path=/opt/kafka-connect/plugins,/usr/share/java/

security.protocol=SSL
ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
ssl.truststore.password=<secret>
ssl.endpoint.identification.algorithm=
producer.security.protocol=SSL
producer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
producer.ssl.truststore.password=<secret>
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
consumer.ssl.truststore.password=<secret>

max.request.size=20000000
max.partition.fetch.bytes=20000000

The connectors configuration

Debezium example:

{
  "name": "qa-mongodb-comp-converter-task|1",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.hosts": "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017",
    "mongodb.name": "qa-debezium-comp",
    "mongodb.ssl.enabled": true,
    "collection.whitelist": "converter[.]task",
    "tombstones.on.delete": true
  }
}

S3 example:

{
  "name": "qa-s3-sink-task|1",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "topics": "qa-debezium-comp.converter.task",
    "topics.dir": "data/env/qa",
    "s3.region": "eu-west-1",
    "s3.bucket.name": "<bucket-name>",
    "flush.size": "15000",
    "rotate.interval.ms": "3600000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "custom.kafka.connect.s3.format.plaintext.PlaintextFormat",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "schema.compatibility": "NONE",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false,
    "transforms": "ExtractDocument",
    "transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value"
  }
}

The connectors are created using curl: curl -X POST -H "Content-Type: application/json" --data @<json_file> http:/<connect_host>:10083/connectors

1
Faced the same issue many times (stackoverflow.com/q/55622904/7109598), but no solution was foundIskuskov Alexander
Never have had this problem. Please show all relevant configs, commands, installation details, etc.... cc @IskuskovOneCricketeer
@cricket_007 I added the configuration details. We add the connectors via the REST API using curl -X POST ...user2732824
Creating a jira issue at issues.apache.org/jira/projects/KAFKA/issues would probably help with investigation. Please include logs in the ticket (redacted if needed). Debug level will be quite more informative with respect to how tasks are created and are assigned during a rebalance. Also, might worth checking whether you experience the issue with CP 5.3.2 too or try each of the three options for connect.protocol. Also, make sure that Connect's internal topics are created with the right settings (config topic needs to be compacted).Konstantine Karantasis
I think it is insane that Kafka Connect doesn't log a WARN or ERROR with information on why the task can't be created or sustained. I added a comment here: issues.apache.org/jira/browse/KAFKA-9747Patrick Szalapski

1 Answers

1
votes

I had the same problem, so i changed the name of the connector and create a new one, it worked but I don't know the source of this problem because we have no information in kafka-connect logs.