I am using spring-kafka to poll message, when I use the annotation for the consumer and set offset to 0 it will see all messages from the earliest. But when I try to use a injected ConsumerFactory to create consumer on my own, then poll will only return a few message or no message at all. Is there some other config I need in order to be able to pull message? The poll timeout is already set to 10 seconds.
@Component
public class GenericConsumer {
private static final Logger logger = LoggerFactory.getLogger(GenericConsumer.class);
@Autowired
ConsumerFactory<String, Record> consumerFactory;
public ConsumerRecords<String, Record> poll(String topic, String group){
logger.info("---------- Polling kafka recrods from topic " + topic + " group" + group);
Consumer<String, Record> consumer = consumerFactory.createConsumer(group, "");
consumer.subscribe(Arrays.asList(topic));
// need to make a dummy poll before we can seek
consumer.poll(1000);
consumer.seekToBeginning(consumer.assignment());
ConsumerRecords<String, Record> records;
records = consumer.poll(10000);
logger.info("------------ Total " + records.count() + " records polled");
consumer.close();
return records;
}
}