0
votes

I'm new to Kafka and while trying a sample scenario where a Kafka Producer sends a user Details in JSON format to a Consumer. I've visited similar questions but I couldn't get the answer I needed.

I don't face any problem if I run any of the Producer or Consumer in a terminal and the other in spring boot. The error occurs, in infinite loop (when both producer and consumer are started from different spring boot projects):

Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1427) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) ~[spring-kafka-2.6.7.jar:2.6.7]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition Example3-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'edu.kafka.producer.model.User' is not in the trusted packages: [java.util, java.lang, edu.consumer.test.model, edu.consumer.test.model.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:100) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:504) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1283) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.0.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1271) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1162) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.7.jar:2.6.7]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

I've mentioned the, deserialization and trusted packages in consumer configuration, below:

@EnableKafka
@Configuration
public class TestConfig {

    @Bean
    public ConsumerFactory<String, User> consumerFactory() {
        
        Map<String, Object> config = new HashMap<>();
        
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "edu.kafka.producer.model.User, java.util, java.lang, edu.consumer.test.model, edu.consumer.test.model.*" );
        
        return new DefaultKafkaConsumerFactory<String, User>(config, new StringDeserializer(), new JsonDeserializer<>(User.class));
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, User> kafkaLister() {
        
        ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
        
        factory.setConsumerFactory(consumerFactory());
        
        return factory;
    }
    
}

I believe I'm missing something in configurations. I wanted to print the receiving message from Kafka into my Spring Boot console (I understand printing in console isn't recommended, this is a practice project), Below is the listener for consumer:

@Service
public class TestListener {

    @KafkaListener(topics = "Example3", groupId = "group_json", containerFactory = "kafkaLister")
    public void post(User user) {
        
        System.out.println("Consumed Message: " + user);
    }
    
}

The JSON I'm trying to consume:

{"name":"qaz","dept":"Aero"}

Spring version: 2.4.4

Kafka version (according to console): 2.6.7

Thank you so much in advance.

2

2 Answers

1
votes

Caused by: java.lang.IllegalArgumentException: The class 'edu.kafka.producer.model.User' is not in the trusted packages: [java.util, java.lang, edu.consumer.test.model, edu.consumer.test.model.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

Looks like the deserializer is getting its properties from somewhere else.

config.put(JsonDeserializer.TRUSTED_PACKAGES, "edu.kafka.producer.model.User, java.util, java.lang, edu.consumer.test.model, edu.consumer.test.model.*" );

'edu.kafka.producer.model.User'

You are trying to deserialize a ...producer.model.User not a ...consumer.model.User

The ...producer... is coming from type information in headers; if you want to map a ...producer... object to a ...consumer... object, you need to configure type mapping as described in the documentation.

If you are only deserializing User objects, you can set use type info to false and set the default value type. See the configuration options...

https://docs.spring.io/spring-kafka/docs/current/reference/html/#serdes-json-config

Configuration Properties

JsonSerializer.ADD_TYPE_INFO_HEADERS (default true): You can set it to false to disable this feature on the JsonSerializer (sets the addTypeInfo property).

JsonSerializer.TYPE_MAPPINGS (default empty): See Mapping Types.

JsonDeserializer.USE_TYPE_INFO_HEADERS (default true): You can set it to false to ignore headers set by the serializer.

JsonDeserializer.REMOVE_TYPE_INFO_HEADERS (default true): You can set it to false to retain headers set by the serializer.

JsonDeserializer.KEY_DEFAULT_TYPE`: Fallback type for deserialization of keys if no header information is present.

JsonDeserializer.VALUE_DEFAULT_TYPE: Fallback type for deserialization of values if no header information is present.

JsonDeserializer.TRUSTED_PACKAGES (default java.util, java.lang): Comma-delimited list of package patterns allowed for deserialization. * means deserialize all.

JsonDeserializer.TYPE_MAPPINGS (default empty): See Mapping Types.

JsonDeserializer.KEY_TYPE_METHOD (default empty): See Using Methods to Determine Types.

JsonDeserializer.VALUE_TYPE_METHOD (default empty): See Using Methods to Determine Types.

The default type's package is always trusted.

config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);

The key and value deserializer have to be the ErrorHandlingDeserializer. You still have the native deserializers there.

0
votes

Based on Mr. Gary Russell's answer below is the configuration which resolved the issue

Producer Configuration:

@Bean
public ProducerFactory<String, User> producerFactory() {
    Map<String, Object> config = new HashMap<>();

    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
    config.put(JsonSerializer.TYPE_MAPPINGS, "user:edu.kafka.test.model.User");

    return new DefaultKafkaProducerFactory<>(config);
}


@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

Consumer Configuration:

@Configuration
@EnableKafka
public class TestConfig {

    @Bean
    public ConsumerFactory<String, User> consumerFactory() {
        
        Map<String, Object> config = new HashMap<>();
        
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(JsonSerializer.TYPE_MAPPINGS, "user:edu.kafka.test.model.User");
        config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "edu.kafka.test.model.User");
        config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
        
        return new DefaultKafkaConsumerFactory<String, User>(config, new StringDeserializer(), new JsonDeserializer<>(User.class));
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, User> kafkaLister() {
        
        ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
        
        factory.setMissingTopicsFatal(false);
        
        factory.setConsumerFactory(consumerFactory());
        
        return factory;
    }
    
}