I don't want to use @KafkaListener or @StreamListener but I want to manually poll kafka. I am using spring-cloud-starter-stream-kafka library and I have the following Kafka Producer
@Autowired
private KafkaTemplate<byte[], byte[]> template;
public void sendMessages() {
IntStream.range(2)
.forEach(val -> {
template.send("kafka-topic", "hello".getBytes());
});
}
I would like to manually poll the same kafka topic using spring-kafka. I tried the following consumer
@Autowired
private ConsumerFactory consumerFactory;
public void processKafkaRecords() throws InterruptedException {
Consumer<byte[], byte[]> consumer = consumerFactory.createConsumer("0", "consumer-1");
consumer.subscribe(Arrays.asList("kafka-topic"));
ConsumerRecords<byte[], byte[]> poll = consumer.poll(Duration.ofMillis(1000));
poll.forEach(record -> {
log.info("record {}", record);
});
}
application.properties
spring.cloud.stream.bindings.pollableInput.destination=kafka-topic
spring.cloud.stream.bindings.pollableInput.group=kafka-topic
spring.cloud.stream.bindings.pollableInput.consumer.batch-mode=true
spring.cloud.stream.bindings.pollableInput.consumer.header-mode=none
spring.cloud.stream.bindings.pollableInput.consumer.use-native-decoding=true
spring.cloud.stream.kafka.bindings.pollableInput.consumer.autoCommitOffset=false
However, the consumer never gets any records which the producer had sent. Any ideas how to manually poll a kafka topic?
@Autowired
wouldn't work. If your Kafka is on localhost; you are probably getting Boot's default. You need to addspring.kafka.consumer.auto-offset-reset=earliest
- see the Spring Boot Kafka documentation chapter for more information. You also need to wait for your subscription to occur via aRebalanceListener
(or useconsumer.assign()
instead ofsubscribe()
). – Gary Russell