4
votes

I am using a kafka produer and a Spring kafka consumer. I am using a Json serializer and deserializer. Whenever I try to read messages in the consumer from the topic i get the following error:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition fan_topic-0 at offset 154. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided

I have not configured anything about headers neither in the producer nor in the consumer. What am i missing here?

2

2 Answers

3
votes

I believe that you are missing the fact that JsonDeserializer has to be configured on the ConsumerFactory with an appropriate default type to deserialize, but not in the Kafka properties.

All the info is presented in the Docs: https://docs.spring.io/spring-kafka/docs/2.1.7.RELEASE/reference/html/_reference.html#serdes

1
votes

just adding to above answer,

The below changes solved for me.

config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);

adding

return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(String.class));

instead of

return new DefaultKafkaConsumerFactory<String, String>(config);

For reference,

the below method in deserialize expecting the headers and "Assert.state.." throws IllegalStateException

 @Override
        public T deserialize(String topic, Headers headers, byte[] data) {
            JavaType javaType = this.typeMapper.toJavaType(headers);
            if (javaType == null) {
                Assert.state(this.targetType != null, "No type information in headers and no default type provided");
                return deserialize(topic, data);
            }
            else {
                try {
                    return this.objectMapper.readerFor(javaType).readValue(data);
                }
                catch (IOException e) {
                    throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) +
                            "] from topic [" + topic + "]", e);
                }
            }
        }