1
votes

I know that in Sping-Kafka we have below methods:

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map assignments, ConsumerSeekCallback callback);

But which one it does the same thing like the native ConsumerRebalanceListener method onPartitionsRevoked?

"This method will be called before a rebalance operation starts and after the consumer stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a custom offset store to prevent duplicate data."

If I want to implement the ConsumerRebalanceListener, how can I pass the KafkaConsumer reference? I only see the Consumer from Spring-Kafka.

=========update======

Hi,Gary when I add RebalanceListener this into the ContainerProperties. I can see that both methods get triggered. however, I got the exception, saying sth like "Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing" Do you have any idea?

=========== update 2 ============

    public ConcurrentMessageListenerContainer<Integer, String> createContainer(
      ContainerProperties containerProps, IKafkaConsumer iKafkaConsumer) {

    Map<String, Object> props = consumerProps();

    DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);

    **RebalanceListner rebalanceListner = new RebalanceListner(cf.createConsumer());**

    CustomKafkaMessageListener ckml = new CustomKafkaMessageListener(iKafkaConsumer, rebalanceListner);

    CustomRecordFilter cff = new CustomRecordFilter();

    FilteringAcknowledgingMessageListenerAdapter faml = new FilteringAcknowledgingMessageListenerAdapter(ckml, cff, true);

    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(5);

    FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
    backOffPolicy.setBackOffPeriod(1500); // 1.5 seconds

    RetryTemplate rt = new RetryTemplate();
    rt.setBackOffPolicy(backOffPolicy);
    rt.setRetryPolicy(retryPolicy);
    rt.registerListener(ckml);
    RetryingAcknowledgingMessageListenerAdapter rml = new RetryingAcknowledgingMessageListenerAdapter(faml, rt);

    containerProps.setConsumerRebalanceListener(rebalanceListner);
    containerProps.setMessageListener(rml);
    containerProps.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    containerProps.setErrorHandler(ckml);
    containerProps.setAckOnError(false);
    ConcurrentMessageListenerContainer<Integer, String> container = new ConcurrentMessageListenerContainer<>(
        cf, containerProps);

    container.setConcurrency(1);
    return container;
  }
1

1 Answers

1
votes

You can add a RebalanceListener to the container's ContainerProperties that are passed into the constructor.