0
votes

I am using Kafka Consumer API to build the consumer.The message structure is complex. To build the deserializers I have implemented the Deserializer class and provide necessary implementations.I am using Jackson API for deserializing. I am getting this error "Exception raisedorg.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition staging.datafeeds.PartnerHotel-0 at offset 19205124"

#POJO classes

    public class Change {
    private  Schema schema;
    private  Payload payload;
    //Getters and constructor
    }
    public class Details {
    private List<String> effectedAttributes;
    private List<PartnerHotel> cluster;
    //Getters and contructor
    }
    public class Field {
    private String type;
    private Boolean optional;
    private String field;
    //Getters and constructor
    }
    public class Fields {
    private String type;
    private List<Field> fields;
    private Boolean optional;
    private String name;
    //Getters and contructor
    }
    public class Geom{
    private int srid;
    private String wkb;
    //Getters and contructor
    }
    public class PartnerHotel{
    private int id;
    private int shopId;
    private String partnerHotelId;
    private boolean isOnline;
    private boolean isRemovedByUser;
    private int mappingPriority;
    private int hotelId;
    private String statusHotelId;
    private String name;
    private String street;
    private String zipCode;
    private String city;
    private String sourceCityId;
    private String state;
    private String stateAlpha2;
    private String country;
    private String alpha2;
    private String alpha3;
    private double latitude;
    private double longitude;
    private Geom geomPoint;
    private int countryIdShop;
    private int selectedGeoname;
    private String propertyType;
    private List<String> tags;
    private int stars;
    private String url;
    private int nrRatings;
    private double recommendation;
    private long dateHotelId;
    private long timeStamp;
    private long lastImport;
    //Getters and contructor
    }
    public class Payload {
    private PartnerHotel before;
    private PartnerHotel after;
    private Source source;
    private String op;
    private String ts_ms;
    //Getters and contructor
    }
    public class Schema {
    private String type;
    private Boolean optional;
    private String name;
    private List<Fields> fields;
    //Getters and contructor
    }
    public class Source {
    private String version;
    private String name;
    private String ts_usec;
    private String txId;
    private String lxn;
    private Boolean snapshot;
    private Object lastSnapshotRecord;
    //Getters and contructor
    }

#Deserializer

    public class ChangeDeserializer implements Deserializer<Change> {

    public ChangeDeserializer(){ }

    public void configure(Map<String, ?> map, boolean b) {}

    public Change deserialize(String topic, byte[] data) {
        if(data == null){
            return null;
        }
        try{
            ObjectMapper objectMapper = new ObjectMapper();
            Change change = objectMapper.readValue(data,Change.class);
            return change;
        }
        catch(IOException exception){
            throw new DeserializationException("Unable to deserialize               Change", exception);
        }}

    public void close() {}
    }

#Consumer
    public class KafkaAcnowledger {
        public static void main(String[] args){
        Properties props = new Properties();
        props.put("bootstrap.servers", "someUrl");
        props.put("group.id", "test131");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("max.poll.records",1);
        props.put("auto.offset.reset","earliest");
        props.put("key.deserializer",    "org.apache.kafka.common.serialization.LongDeserializer");
        props.put("value.deserializer",    "deserializer.ChangeDeserializer");
        KafkaConsumer<Long, Change> consumer = new KafkaConsumer(props);
        consumer.subscribe(Arrays.asList("staging.datafeeds.PartnerHotel"));
        while (true) {
            try{
            ConsumerRecords<Long, Change> records = consumer.poll(100);
            for (ConsumerRecord<Long, Change> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
        catch(Exception exception){
                System.out.println("Exception raised" + exception);
        }
        }


    }
    }

The poll() in the consumer looks fine , and the enter code hereexception I am getting a Serialization exception . I checked the consumer group via kafka-consumer-groups.sh , the group of this consumer is there in the list.Any direction is appreciated .

Structure of the message in the Kafka topic:

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int16","optional":false,"field":"shopId"},{"type":"string","optional":false,"field":"partnerHotelId"},{"type":"boolean","optional":false,"field":"isOnline"},{"type":"boolean","optional":false,"field":"isRemovedByUser"},{"type":"int32","optional":false,"field":"mappingPriority"},{"type":"int32","optional":true,"field":"hotelId"},{"type":"string","optional":true,"field":"statusHotelId"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"dateHotelId"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"timestamp"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"lastImport"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"street"},{"type":"string","optional":true,"field":"zipcode"},{"type":"string","optional":true,"field":"city"},{"type":"string","optional":true,"field":"sourceCityId"},{"type":"string","optional":true,"field":"state"},{"type":"string","optional":true,"field":"stateAlpha2"},{"type":"string","optional":true,"field":"country"},{"type":"string","optional":true,"field":"alpha2"},{"type":"string","optional":true,"field":"alpha3"},{"type":"double","optional":true,"field":"latitude"},{"type":"double","optional":true,"field":"longitude"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"geomPoint"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedGeonames"},{"type":"int32","optional":true,"field":"countryIdShop"},{"type":"int32","optional":true,"field":"selectedGeoname"},{"type":"string","optional":true,"field":"propertyType"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"tags"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"chains"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"creditCards"},{"type":"int32","optional":true,"field":"stars"},{"type":"string","optional":true,"field":"url"},{"type":"int32","optional":true,"field":"nrRatings"},{"type":"double","optional":true,"field":"recommendation"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedPartnerHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"removedFromHotelIds"}],"optional":true,"name":"staging.datafeeds.PartnerHotel.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int16","optional":false,"field":"shopId"},{"type":"string","optional":false,"field":"partnerHotelId"},{"type":"boolean","optional":false,"field":"isOnline"},{"type":"boolean","optional":false,"field":"isRemovedByUser"},{"type":"int32","optional":false,"field":"mappingPriority"},{"type":"int32","optional":true,"field":"hotelId"},{"type":"string","optional":true,"field":"statusHotelId"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"dateHotelId"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"timestamp"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"lastImport"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"street"},{"type":"string","optional":true,"field":"zipcode"},{"type":"string","optional":true,"field":"city"},{"type":"string","optional":true,"field":"sourceCityId"},{"type":"string","optional":true,"field":"state"},{"type":"string","optional":true,"field":"stateAlpha2"},{"type":"string","optional":true,"field":"country"},{"type":"string","optional":true,"field":"alpha2"},{"type":"string","optional":true,"field":"alpha3"},{"type":"double","optional":true,"field":"latitude"},{"type":"double","optional":true,"field":"longitude"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"geomPoint"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedGeonames"},{"type":"int32","optional":true,"field":"countryIdShop"},{"type":"int32","optional":true,"field":"selectedGeoname"},{"type":"string","optional":true,"field":"propertyType"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"tags"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"chains"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"creditCards"},{"type":"int32","optional":true,"field":"stars"},{"type":"string","optional":true,"field":"url"},{"type":"int32","optional":true,"field":"nrRatings"},{"type":"double","optional":true,"field":"recommendation"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedPartnerHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"removedFromHotelIds"}],"optional":true,"name":"staging.datafeeds.PartnerHotel.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"db"},{"type":"int64","optional":true,"field":"ts_usec"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"string","optional":true,"field":"schema"},{"type":"string","optional":true,"field":"table"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"boolean","optional":true,"field":"last_snapshot_record"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"staging.datafeeds.PartnerHotel.Envelope"},"payload":{"before":null,"after":{"id":13893497,"shopId":135,"partnerHotelId":"6-42036","isOnline":false,"isRemovedByUser":false,"mappingPriority":0,"hotelId":null,"statusHotelId":"AUTO","dateHotelId":null,"timestamp":1529334013938327,"lastImport":1503491984188866,"name":"Ferienvermietung Wiedemann","street":"Chausseeberg 3","zipcode":"17429","city":"Mellenthin","sourceCityId":null,"state":null,"stateAlpha2":null,"country":"Deutschland","alpha2":"DE","alpha3":null,"latitude":53.920278,"longitude":14.013333,"geomPoint":{"wkb":"AQEAACDmEAAARuo9ldMGLEA5nWSry/VKQA==","srid":4326},"proposedGeonames":[2872064],"countryIdShop":83,"selectedGeoname":2872064,"propertyType":null,"tags":["77","36","33","34","38","43","41","123","26","29","1","7","6","70","9","1000","58","17","18","15","13","14","20","65","63","46","10","52"],"chains":[],"creditCards":[],"stars":null,"url":"http://www.buchen.travel/onepage-idealo-booking/index.php?room=6-42036","nrRatings":null,"recommendation":null,"proposedHotels":[],"proposedPartnerHotels":[],"removedFromHotelIds":[]},"source":{"version":"0.8.3.Final","name":"staging","db":"geo","ts_usec":1554391067119000,"txId":4757138,"lsn":1139303143104,"schema":"datafeeds","table":"PartnerHotel","snapshot":true,"last_snapshot_record":false},"op":"r","ts_ms":1554391067119}}
1
Please clarify what you are tried out. Is it same when you use really Plain java object (which have fields of primitive classes only)? Did you check that serialized object has the same structure?ADS
For serialized objects , the data is send over to Kafka topic , via Debezium (Kafka connect connector) for streaming changes from DB to Kafka. What I am trying out is to consume messages from the topic (from where Debezium has sent) and then perform some operations , so first step is to get that data via proper deserialization.Muhammad Sufyian
I suggest first step is to reduce range of options which cause your problem. Could you try out to send to test topic simple TestClass { long value; String text;}? If you could then de-serialize it then the problem somewhere in classes and de-serialization. If you don't then the problem somewhere in Debezium configuration.ADS
I don't know Jackson, but does it support type Object (Object lastSnapshotRecord;) - it will probably have to revert to the default Java serialization which can get you into issues if your class versions are different... I would try without that field for nowAxel Podehl
Note: Spring Kafka and Confluent both already provide JSON deserializers. You should not need to write your ownOneCricketeer

1 Answers

2
votes

You POJO is not compatible with your message and jackson cannot parse it. At least there is lack of few fields, following error can be found.

Unrecognized field "timestamp" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "zipcode" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "proposedGeonames" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "chains" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "creditCards" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "proposedHotels" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "proposedPartnerHotels" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "removedFromHotelIds" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "db" (class  com.example.kafka.Change$Source), not marked as ignorable
Unrecognized field "lsn" (class  com.example.kafka.Change$Source), not marked as ignorable
Unrecognized field "schema" (class  com.example.kafka.Change$Source), not marked as ignorable
Unrecognized field "table" (class  com.example.kafka.Change$Source), not marked as ignorable
Unrecognized field "last_snapshot_record" (class  com.example.kafka.Change$Source), not marked as ignorable

To fix it you have to add those fields to your POJO or disable fail on unknown: objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);. More regarding jackson deserialization error can be found here: jackson Unrecognized field