1
votes

I am trying to write kafka consumer without using @Kafkalistener and below are the lines of code that I am using for configuring listener:

@Configuration
    @EnableKafka
    public class KafkaConfig {

      @Value("${kafka.bootstrap-servers}")
      private String bootstrapServers;

      @Bean
      public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kafka cluster
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        // allows a pool of processes to divide the work of consuming and processing records
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "org");
        // automatically reset the offset to the earliest offset
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return props;
      }

      @Bean
      public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
      }

      @Bean
      public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
          ContainerProperties containerProperties=new ContainerProperties("in.t");
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
      }

      @Bean
      public Consumer receiver() {
        return new Consumer();
      }
    }

Here how can I configure the topic and listener method and my consumer class can have multiple methods .

Also, wanted to know if there are any potential issues faced while using @kafkalistener with kafka streams.

PS: I don't want to use @KafkaListener.

2
why you don't to use kafka listener? @user - Deadpool
beacuse it doesnt work with streams - Rajeev Akotkar

2 Answers

2
votes
  1. The @kafkalistener doesn't work with Kafka Streams. It is for plain Consumer. The Kafka Stream can be managed via StreamsBuilderFactoryBean and particular @Bean for the KStream.

  2. If you don't want to use a @kafkalistener, then you need to go a direction with the manual KafkaListenerContainer creation. The KafkaListenerContainerFactory can be used for that purpose, but only since Spring Kafka 2.2 and definitely not wit the Spring Boot.

So, you don't have choice unless to create a ConcurrentMessageListenerContainer instead manually. And already here via ContainerProperties you can inject a target messageListener. For you custom Consumer POJO you need to consider wrap it into the RecordMessagingMessageListenerAdapter. And only the last one has to be injected into the ConcurrentMessageListenerContainer.

That's how things work underneath with the @KafkaListener.

0
votes

With Spring Boot and Spring Kafka it's possible to create consumers without @KafkaListener and creation additional explicit kafka's beans (ConcurrentMessageListenerContainer, etc ...):

  1. application.yml - create connection configs. They will be injected into KafkaProperties
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: "org.apache.kafka.common.serialization.StringSerializer"
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
    listener:
      type: BATCH
    consumer:
      clientId: "applicationClientId"
      groupId: "applicationGroup"
      keyDeserializer: "org.apache.kafka.common.serialization.StringDeserializer"
      valueDeserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      maxPollRecords: 50
      fetchMinSize: 50
      autoOffsetReset: earliest
      properties:
        spring:
          json:
            trusted:
              packages: "*"
  1. Based on KafkaProperties, spring-boot will create necessary beans. So, it's required to create only useful topic endpoints for data processing.
@Configuration
@EnableKafka
@RequiredArgsConstructor //Lombok annotation
public class KafkaConsumerConfig implements KafkaListenerConfigurer {

    private final BeanFactory beanFactory;
    private final SomethingApplicationService somethingApplicationService

    @SneakyThrows //Lombok annotation
    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        // The following may be in a loop with different services and topics
        val endpoint = new MethodKafkaListenerEndpoint<String, KafkaReceiptRequest>();

        endpoint.setBeanFactory(beanFactory);
        endpoint.setBean(somethingApplicationService);
        endpoint.setMethod(somethingApplicationService.getClass().getDeclaredMethod("processList", List.class));
        endpoint.setId("Unique id for this endpoint");
        endpoint.setTopics("topic1", ...);
        endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory());

        registrar.registerEndpoint(endpoint);
    }

    @Bean
    public MessageHandlerMethodFactory messageHandlerMethodFactory() {
        return new DefaultMessageHandlerMethodFactory();
    }

}
  1. Target service and data object
@Service
public class SomethingApplicationService {

    public void processList(List<Data> list) {
        //processing logic
    }

}

@Data //Lombok annotation
public class Data {
    private String name;
    private String value;
}