0
votes

As described I am currently setting up a Kafka Connect Sink to sink data from Kafka to Google Cloud Storage.

Everything is going smoothly, however - it is only using the latest available offset. That is, once it begins running, it only sinks the newly produced messages to GCS and not the already existing messages from Kafka. I have tried deleting the kafka connect storage/offset topics, creating a new connector name, etc. However, it always starts at the latest offset.

If there anyway to configure earliest offset for Kafka Connect GCS Sink? I have not seen any configurations to handle this on

https://docs.confluent.io/current/connect/kafka-connect-gcs/configuration_options.html

or

https://docs.confluent.io/current/connect/references/allconfigs.html

I've tried deleting any kafka connect topics/file storage, as well as starting with a new connector name

I am seeing the Kafka Connect sink messages that were produced after the connector started.

I am expecting/need messages to sink from the earliest available offset, ie. start from earliest message if no offset is committed for the connector

1

1 Answers

1
votes

When you create a connector for the first time it will take by default the earliest offset. You should see this in the Connect worker log:

[2019-08-05 23:31:35,405] INFO ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
…

You can override this by changing in the Worker config : consumer.auto.offset.reset.

When you delete a connector and recreate it, the offset will be retained and reused.

If you create a connector with a new name, it will use the offset as set in the connect worker (earliest) by default.