35
votes

It is possible to change the start offset for a new topic? I would like to create a new topic and start reading from the offset 10000. How?

4

4 Answers

30
votes

You can do this with the help of the zookeeper shell. Kafka uses zookeeper to track the consumer offsets.

Go to kafka bin directory and invoke zookeeper shell.(my kafka version is 0.8.0)

./zookeeper-shell.sh localhost:2181

Now use the zookeeper get command

get /consumers/consumer_group_id/offsets/topic/0

it shows something like

2043
cZxid = 0x4d
ctime = Wed Mar 18 03:56:32 EDT 2015
...

Here 2043 is the maximum offset consumed. Set it to desired value by using zookeeper set command

set /consumers/consumer_group_id/offsets/topic/0 10000

The path is framed like this /consumers/[consumer_group_id]/offsets/[topic]/[partition_id].
You will have to substitute with appropriate consumer group, topic and partition id.

*Also since you mentioned it's a new instance of kafka, I am not sure whether consumers would have connected and consumer groups created.

91
votes

Since kafka 0.11.0.0 you can use the script kafka-consumer-groups.sh Example from this answer

kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --to-earliest --all-topics --execute

Other options listed in the KIP-122: Add Reset Consumer Group Offsets tooling

.----------------------.-----------------------------------------------.----------------------------------------------------------------------------------------------------------------------------------------------.
|      Scenario        |                   Arguments                   |                                                                    Example                                                                   |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset to Datetime    |  --to-datetime YYYY-MM-DDTHH:mm:SS.sss±hh:mm  |  Reset to first offset since 01 January 2017, 00:00:00 hrs: --reset-offsets –group test.group --topic foo --to-datetime 2017-01-01T00:00:00Z |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset by Duration    |  --by-duration  PnDTnHnMnS                    |  Reset to first offset since one week ago (from current timestamp): --reset-offsets --group test.group --topic foo --by-duration P7D         |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset to Earliest    |  --to-earliest                                |  Reset to earliest offset available: --reset-offsets --group test.group --topic foo --to-earliest                                            |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset to Latest      |  --to-latest                                  |  Reset to latest offset available: --reset-offsets --group test.group --topic foo --to-latest                                                |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset to Offset      |  --to-offset                                  |  Reset to offset 1 in all partitions: --reset-offsets --group test.group --topic foo --to-offset 1                                           |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Shift Offset by 'n'  |  --shift-by n                                 |  Reset to current offset plus 5 positions: --reset-offsets --group test.group –topic foo --shift-by 5                                        |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset from File      |  --from-file PATH_TO_FILE                     |  Reset using a file with reset plan: --reset-offsets --group test.group --from-file reset-plan.csv                                           |
'----------------------'-----------------------------------------------'----------------------------------------------------------------------------------------------------------------------------------------------'

You can also define the partition you want to reset, example:

  • Reset offset of topic foo partition 0 to 1

    --reset-offsets --group test.group --topic foo:0 --to-offset 1

  • Reset offset of topic foo partition 0,1,2 to earliest

    --reset-offsets --group test.group --topic foo:0,1,2 --to-earliest

Reminder: don't forget the --execute flag (see the execution options in the KIP). Without this flag the script will only print out the result of the scenario by scope, for example:

TOPIC                 PARTITION NEW-OFFSET NEW-LAG LOG-END-OFFSET CONSUMER-ID HOST CLIENT-ID
foo                   0         90         10      100            -           -    -

Credits to this answer. Table created with ascii tables

4
votes

Since kafka 0.9 offsets are stored in a topic. To change offset, use the seek() method:

public void seek(TopicPartition partition, long offset)

Overrides the fetch offsets that the consumer will use on the next poll(timeout). If this API is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets

2
votes

if you need to change the offset.

kafka-consumer-groups --bootstrap-server {url} \
--topic {topic} \
--group {consumer} \
--reset-offsets --to-datetime 2020-11-11T00:00:00.000+0900 \
--execute

Unparseable Date Error when parsing UTC string through SimpleDateFormat to Date