After restarting a Kafka Connect S3 sink task, it restarted writing all the way from the beginning of the topic and wrote duplicate copies of older records. In other words, Kafka Connect seemed to lose its place.
So, I imagine that Kafka Connect stores current offset position information in the internal connect-offsets topic. That topic is empty which I presume is part of the problem.
The other two internal topics connect-statuses and connect-configs are not empty. connect-statuses has 52 entries. connect-configs has 6 entries; three for each of two sink connectors I have configured: connector-<name>, task-<name>-0, commit-<name>.
I manually created the internal Kafka Connect topics as specified in the docs before running this:
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact
I can verify that the connect-offsets topic seems to be created correctly:
/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets PartitionCount:50 ReplicationFactor:3 Configs:cleanup.policy=compact
Topic: connect-offsets Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: connect-offsets Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: connect-offsets Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
<snip>
This is with a three server cluster running Confluent Platform v3.2.1 running Kafka 10.2.1.
Is connect-offsets supposed to be empty? Why else would Kafka Connect restart at the beginning of the topic when restarting a task?
UPDATE: Response to Randall Hauch's answer.
- Explanation regarding source connector offsets vs sink connector offsets explains empty
connect-offsets. Thanks for explanation! - I'm definitely not changing connector name.
- If the connector is down for ~five days and restarted afterwards, is there any reason that the connector offset position would expire and reset? I see
__consumer_offsetshascleanup.policy=compact auto.offset.resetshould only take affect if there is no position in__consumer_offsets, right?
I'm using mostly system defaults. My Sink config JSON is as follows. I'm using a very simple custom partitioner to partition on an Avro datetime field rather than wallclock time. That feature seems to have been added in Confluent v3.2.2 so that I won't need a custom plugin for that functionality. I'm hoping to skip Confluent v3.2.2 and go straight to v3.3.0 when it is available.
{
"name": "my-s3-sink",
"tasks.max": 1,
"topics": "my-topic",
"flush.size": 10000,
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
"partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",
"s3.bucket.name": "my-bucket",
"s3.region": "us-west-2",
"partition.field.name": "timestamp",
"locale": "us",
"timezone": "UTC",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"schema.compatibility": "NONE",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081"
}