0
votes

I am using spring-kafka to poll message, when I use the annotation for the consumer and set offset to 0 it will see all messages from the earliest. But when I try to use a injected ConsumerFactory to create consumer on my own, then poll will only return a few message or no message at all. Is there some other config I need in order to be able to pull message? The poll timeout is already set to 10 seconds.

@Component
public class GenericConsumer {
  private static final Logger logger = LoggerFactory.getLogger(GenericConsumer.class);

  @Autowired
  ConsumerFactory<String, Record> consumerFactory;

  public ConsumerRecords<String, Record> poll(String topic, String group){
    logger.info("---------- Polling kafka recrods from topic " + topic + " group" + group);
    Consumer<String, Record> consumer = consumerFactory.createConsumer(group, "");
    consumer.subscribe(Arrays.asList(topic));
    // need to make a dummy poll before we can seek
    consumer.poll(1000);
    consumer.seekToBeginning(consumer.assignment());
    ConsumerRecords<String, Record> records;
    records = consumer.poll(10000);
    logger.info("------------ Total " + records.count() + " records polled");
    consumer.close();
    return records;
  }
}
1
What is max.poll.records set to? You might need to poll in a loop. - Gary Russell
I tried max.poll.records 500 and 5000, and I did tried in loop. But made no difference. - Ming

1 Answers

0
votes

It works fine for me, this was with boot 2.0.5, Spring Kafka 2.1.10 ...

@SpringBootApplication
public class So52284259Application implements ConsumerAwareRebalanceListener {

    private static final Logger logger = LoggerFactory.getLogger(So52284259Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So52284259Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template, GenericConsumer consumer) {
        return args -> {
//          for (int i = 0; i < 1000; i++) { // load up the topic on first run
//              template.send("so52284259", "foo" + i);
//          }
            consumer.poll("so52284259", "generic");
        };
    }

    @KafkaListener(id = "listener", topics = "so52284259")
    public void listen(String in) {
        if ("foo999".equals(in)) {
            logger.info("@KafkaListener: " + in);
        }
    }

    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        consumer.seekToBeginning(partitions);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so52284259", 1, (short) 1);
    }

}

@Component
class GenericConsumer {

    private static final Logger logger = LoggerFactory.getLogger(GenericConsumer.class);

    @Autowired
    ConsumerFactory<String, String> consumerFactory;

    public void poll(String topic, String group) {
        logger.info("---------- Polling kafka recrods from topic " + topic + " group" + group);
        Consumer<String, String> consumer = consumerFactory.createConsumer(group, "");
        consumer.subscribe(Arrays.asList(topic));
        // need to make a dummy poll before we can seek
        consumer.poll(1000);
        consumer.seekToBeginning(consumer.assignment());
        ConsumerRecords<String, String> records;
        boolean done = false;
        while (!done) {
            records = consumer.poll(10000);
            logger.info("------------ Total " + records.count() + " records polled");
            Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
            while (iterator.hasNext()) {
                String value = iterator.next().value();
                if ("foo999".equals(value)) {
                    logger.info("Consumer: " + value);
                    done = true;
                }
            }
        }
        consumer.close();
    }

}

and

2018-09-12 09:35:25.929  INFO 61390 --- [           main] com.example.GenericConsumer              : ------------ Total 500 records polled
2018-09-12 09:35:25.931  INFO 61390 --- [           main] com.example.GenericConsumer              : ------------ Total 500 records polled
2018-09-12 09:35:25.932  INFO 61390 --- [           main] com.example.GenericConsumer              : Consumer: foo999
2018-09-12 09:35:25.942  INFO 61390 --- [ listener-0-C-1] com.example.So52284259Application        : @KafkaListener: foo999