3
votes

Using the new Kafka Java consumer api, I run a single consumer to consume messages. When all available messages are consumed, I kill it with kill -15.

Now I would like to reset the offsets to start. I would like to avoid to just use a different consumer group. What I tried is the following sequence of calls, using the same group as the consumer that just had finished reading the data.

assign(topicPartition);
OffsetAndMetadata om = new OffsetAndMetadata(0);
commitSync(Collections.singletonMap(topicPartition, 0));

I thought I had got this working in a test, but now I always just get:

ERROR internals.ConsumerCoordinator: Error UNKNOWN_MEMBER_ID occurred while committing offsets for group queue
Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552)

Is it in principle wrong to combine assign with commitSync, possibly because only subscribe and commitSync go together? The docs only say that assign does not go along with subscribe, but I thought this applies only in one consumer process. (In fact I was even hoping to run the offset-reset consumer while the other consumer is up, hoping that the other one might notice the offset change and start over again. But shutting it down first is fine too.)

Any ideas?

2

2 Answers

3
votes

Found the problem. The approach described in my question works well, given we respect the following conditions:

  1. There may be no other consumer running with the targeted group.id. Even if a consumer is subscribed only to other topics, this hinders committing topic offsets after calling assign() instead of subscribe().

  2. After the last other consumer has stopped, it takes 30 seconds (I think it is group.max.session.timeout.ms) before the operation can succeed. The indicative log message from kafka is

    Group X generation Y is dead and removed
    

Once this appears in the log, the sequence

assign(topicPartition);
OffsetAndMetadata om = new OffsetAndMetadata(0);
commitSync(Collections.singletonMap(topicPartition, 0));

can succeed.

0
votes

Why even commit offsets in the first place? Set enable.auto.commit to false in Properties and don't commit it at all if you just re-read all messages on restart.

To reset offset you can use for example these methods:

public void seek(TopicPartition partition, long offset)
public void seekToBeginning(TopicPartition... partitions)