4
votes

I am not able to listen the kafka topic (my case 2 topics) when there are multiple consumer. In below example, I am have 2 consumer factory which will be accpet 2 different JSON messages (one is user type and other is Event type). Both the messages are posted to different topic. Here when I am trying to access the Event messages from topic1, i am not able to but I can access the user topic messages.

Ex:

@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {      
@Autowired
private Environment environment;

@Bean
public ConsumerFactory<String,User> consumerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers"));
    config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("user.consumer.group"));
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
            new JsonDeserializer<>(User.class));

}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public ConsumerFactory<String , Event> consumerFactoryEvent(){
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers"));
    config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("event.consumer.group"));
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
            new JsonDeserializer<>(Event.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactoryEvent() {
    ConcurrentKafkaListenerContainerFactory<String, Event> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactoryEvent());
    return factory;
}
}

My main application is like below:

@KafkaListener(topics = "${event.topic}")
public void processEvent(Event event) {
..do something..
..post the message to User topic
}
@KafkaListener(topics = "${user.topic}")
public void processUser(User user) {
..do something..
}

My need is to listen the event topic first and do some massaging to the message and then send it the User topic and I have another method which will listen to the User topic and do something on that message.. I tried to pass different options to the @KafkaListener such as

@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactoryEvent")

but it is not working.. I am not sure whats going wrong.. any suggestion is helpful!

3
Try defining two different bean names for two listeners and add those names in '@kafkaListener' annotationDeadpool

3 Answers

4
votes

If you do not specify name in bean then method name will be bean name, add bean name with groupid in @KafkaListener

@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactoryEvent", groupId="")

@KafkaListener(topics="${event.topic}",containerFactory="kafkaListenerContainerFactory", groupId="")

or

Specify name in @Bean and add that name to @kafkaListener

@Bean(name="kafkaListenerContainerFactoryEvent")
public ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactoryEvent() {
ConcurrentKafkaListenerContainerFactory<String, Event> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryEvent());
return factory;
}
2
votes

It's not easily available in the any of the docs.

Here I am taking the example of consuming the messages from

topic=topic1 with bootstrapserver=url1 (JSON Serializer and Deserializer)

topic=topic2 with bootstrapserver=url2 (Avro Serializer and Deserializer)

Step1:-

@Bean
public ConsumerFactory<String, String> consumerFactory1() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "localhost1:9092"); //This is dummy
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConsumerFactory consumerFactory2() {
    Map props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "localhost2:9092"); //This is dummy
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put("schema.registry.url", "https://abc.schemaregistery.example.com"); //Again this is dummy or can be avro serilaised class
    return new DefaultKafkaConsumerFactory<>(props);
}


  @Bean(name = "kafkaListenerContainerFactory1")
public ConcurrentKafkaListenerContainerFactory
kafkaListenerContainerFactory1() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory1());
    return factory;
}

 @Bean(name = "kafkaListenerContainerFactory2")
public ConcurrentKafkaListenerContainerFactory
kafkaListenerContainerFactory2() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory2());
    return factory;
}

Step2:-

  • @SpringBootApplication(exclude = KafkaAutoConfiguration.class) => Don't read the values from yml or properties file defined by spring.kafka @ConfigurationProperties

Step3:-

 @KafkaListener(
            topics = "topic1",
            containerFactory = "kafkaListenerContainerFactory1" ,
            groupId = "com.groupid1")
    public void receive(ConsumerRecord consumerRecord) throws InterruptedException {


        LOGGER.info("consuming from topic1 {}" , consumerRecord.value());
        Thread.sleep(1000000); //For testing

    }

 @KafkaListener(
            topics = "topic2",
            containerFactory = "kafkaListenerContainerFactory2" ,
            groupId = "com.groupid2")
    public void receive(ConsumerRecord consumerRecord) throws InterruptedException {


        LOGGER.info("consuming from topic2 {}" , consumerRecord.value());
        Thread.sleep(1000000); //For testing

    }
0
votes

Too late to answer, obviously. But it might help someone else.


You don't need to create multiple ConsumerFactory Beans. You can do it without notifying class (User or Event) in the config i.e. new JsonDeserializer<>(Event.class) and add trusted package.

@Bean
public ConsumerFactory<String,User> consumerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("bootstrap.servers"));
    config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("user.consumer.group"));
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);

    // TODO: Remove "*" and add specific package name
    config.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); // <<-- New config added

    return new DefaultKafkaConsumerFactory<>(config);

}

While receiving the record:

@KafkaListener(topics="${event.topic}")
void receiveUserRecord(User record){ ... } # For User POJO

@KafkaListener(topics="${event.topic}")
void receiveEventRecord(Event record){ ... } # For Event POJO