0
votes

I don't want to use @KafkaListener or @StreamListener but I want to manually poll kafka. I am using spring-cloud-starter-stream-kafka library and I have the following Kafka Producer

  @Autowired
  private KafkaTemplate<byte[], byte[]> template;

  public void sendMessages() {
    IntStream.range(2)
             .forEach(val -> {
               template.send("kafka-topic", "hello".getBytes());
             });
  }

I would like to manually poll the same kafka topic using spring-kafka. I tried the following consumer

 @Autowired
  private ConsumerFactory consumerFactory;

  public void processKafkaRecords() throws InterruptedException {
    Consumer<byte[], byte[]> consumer = consumerFactory.createConsumer("0", "consumer-1");
    consumer.subscribe(Arrays.asList("kafka-topic"));
    ConsumerRecords<byte[], byte[]> poll = consumer.poll(Duration.ofMillis(1000));
    poll.forEach(record -> {
      log.info("record {}", record);
    });
  }

application.properties

spring.cloud.stream.bindings.pollableInput.destination=kafka-topic
spring.cloud.stream.bindings.pollableInput.group=kafka-topic
spring.cloud.stream.bindings.pollableInput.consumer.batch-mode=true
spring.cloud.stream.bindings.pollableInput.consumer.header-mode=none
spring.cloud.stream.bindings.pollableInput.consumer.use-native-decoding=true

spring.cloud.stream.kafka.bindings.pollableInput.consumer.autoCommitOffset=false

However, the consumer never gets any records which the producer had sent. Any ideas how to manually poll a kafka topic?

1
You need to show your consumer factory configuration and application.yml/properties.Gary Russell
How do you trigger the consumer? And why are you using steam and then post using a normal product and poll by consumer?daniu
@GaryRussell I don't have consumer factory configuration, I assume it will use the default one, I simply bind it to the kafka topic, is this not enough? is there a simple example where I can see a simple setup? thank yougeorge
@daniu the consumer is a spring bean, I trigger it manually. I use spring stream because I ve been using streamListener but now I want to start polling manuallygeorge
Well, you are getting a consumer factory from somewhere; otherwise the @Autowired wouldn't work. If your Kafka is on localhost; you are probably getting Boot's default. You need to add spring.kafka.consumer.auto-offset-reset=earliest - see the Spring Boot Kafka documentation chapter for more information. You also need to wait for your subscription to occur via a RebalanceListener (or use consumer.assign() instead of subscribe()).Gary Russell

1 Answers

2
votes

There could be several reasons:

  1. Duration.ofMillis(1000) - Try increasing the time, 1s might be too low in some cases unless both your client and kafka are running on the same machine. Because the documentation of poll(Duration) says If the timeout expires, an empty record set will be returned
  2. If you have started the producer first and then the consumer and you did not set the offset reset strategy to earliest then you cannot see any messages because the consumer will be consuming from the latest offset by default. So, try setting the following auto.offset.reset=earliest
  3. Another consumer from the same consumer group might be running and that there is only 1 partition or the consumer group is already at the last offset. In this case, you can try changing the consumer group id.