7
votes

This question is for Spring Kafka, related to Apache Kafka with High Level Consumer: Skip corrupted messages

Is there a way to configure Spring Kafka consumer to skip a record that cannot be read/processed (is corrupt)?

I am seeing a situation where the consumer gets stuck on the same record if it cannot be deserialized. This is the error the consumer throws.

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value 

The consumer polls the topic and just keeps printing the same error in a loop till program is killed.

In a @KafkaListener that has the following Consumer factory configurations,

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
2
i believe consumer never struck on one record, if you see that error message multiple times it means it happened to multiple recordsDeadpool
can you update JSON payload and model mapper class?Deadpool
@Deadpool The consumer is stuck on the same message. I verified it by the logs and the consumer group's offset. I resolved the JSON serialization error, but the aim of my question was to find out a way to skip this message altogether.Shankar P S

2 Answers

10
votes

You need ErrorHandlingDeserializer: https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#error-handling-deserializer

If you can't move to that 2.2 version, consider to implement your own and return null for those records which can't be deserialized properly.

The source code is here: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer2.java

2
votes

In case you are using older version of kafka, in a @KafkaListener set the following Consumer factory configurations.

 Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);

Here is the code for CustomDeserializer:

 import java.util.Map;
    import org.apache.kafka.common.serialization.Deserializer;
    import com.fasterxml.jackson.databind.ObjectMapper;
    public class CustomDeserializer implements Deserializer<Object>
    {
        @Override
        public void configure( Map<String, ?> configs, boolean isKey )
        {
        }

        @Override
        public Object deserialize( String topic, byte[] data )
        {
            ObjectMapper mapper = new ObjectMapper();
            Object object = null;
            try
            {
                object = mapper.readValue(data, Object.class);
            }
            catch ( Exception exception )
            {
                System.out.println("Error in deserializing bytes " + exception);
            }
            return object;
        }

        @Override
        public void close()
        {
        }
    }

Since I want my code to be generic enough to read any kind of json, object = mapper.readValue(data, Object.class); I am converting it to Object.class. And as we are catching exception here, it won't be retried once read.