2
votes

I am trying to deserialize Avro messgaes that are in kafka to POJO generated from Avro Schema. I am using KafkaAvroDeserializer for this conversion.

I am able to see the GenericRecord in the ConsumerRecord<String, Data> record returned from kafka. But when I try to assign this record to generated POJO class object, It is failing for date type of POJO field with ClassCastException. When I checked the avro payload this date field is coming in as integer.

Set up: Avro - 1.9.1 Confluent - 5.4 commercehub gradle plugin 0.20.0

While trying to deserialze the Avro message, I am getting the error as -

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 66
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.time.LocalDate
    at com.sample.Data.put(Data.java:229) ~[main/:na]
    at org.apache.avro.generic.GenericData.setField(GenericData.java:795) ~[avro-1.9.1.jar:1.9.1]
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139) ~[avro-1.9.1.jar:1.9.1]
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237) ~[avro-1.9.1.jar:1.9.1]
    at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.9.1.jar:1.9.1]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170) ~[avro-1.9.1.jar:1.9.1]
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136) ~[avro-1.9.1.jar:1.9.1]
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237) ~[avro-1.9.1.jar:1.9.1]
    at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) ~[avro-1.9.1.jar:1.9.1]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170) ~[avro-1.9.1.jar:1.9.1]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) ~[avro-1.9.1.jar:1.9.1]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) ~[avro-1.9.1.jar:1.9.1]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:287) ~[kafka-avro-serializer-5.4.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:102) ~[kafka-avro-serializer-5.4.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:81) ~[kafka-avro-serializer-5.4.0.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) ~[kafka-avro-serializer-5.4.0.jar:na]
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1290) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) ~[kafka-clients-2.5.0.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1091) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1047) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972) [spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_241]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_241]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_241]

Avro Schema of the field for which conversion is failing with ClassCastException

{
    "name": "BIRTH_DT",
    "type": [
        "null",
        {
            "type": "int",
            "logicalType": "date"
        }
    ],
    "default": null
}

Code Snippets from Generated POJO

  @Deprecated public java.time.LocalDate BIRTH_DT;

  // Used by DatumReader.  Applications should not call.
  @SuppressWarnings(value="unchecked")
  public void put(int field$, java.lang.Object value$) {
    switch (field$) {
    .
    .
    case 8: BIRTH_DT = (java.time.LocalDate)value$; break;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }

  public java.time.LocalDate getBIRTHDT() {
    return BIRTH_DT;
  }

  public void setBIRTHDT(java.time.LocalDate value) {
      this.BIRTH_DT = value;
  }

Kafka Consumer Method

    @KafkaListener(topics = "${spring.kafka.consumer.properties.topic}",
                     groupId = "${spring.kafka.consumer.group-id}")
    // Data is a POJO generated by Avro tools
    public void consume(ConsumerRecord<String, Data> record,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
                        @Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) throws IOException {
    
        logger.info(String.format("#### -> Consumed message -> partiion: %s, offset: %s", partition, offset));
        Data row = record.value();
        ack.acknowledge();
    }

build.gradle

buildscript {
    repositories {
        jcenter {
            url "https://nexus.abc.com:8443/content/repositories/jcenter/"
        }
    }
    dependencies {
        classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.20.0"
    }
}

plugins {
    id 'org.springframework.boot' version '2.3.1.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'java'
    id 'idea'
    id 'eclipse'
}

repositories {
    maven { url nexusPublicRepoURL }
    maven { url "https://nexus.abc.com:8443/content/repositories/confluence.io-maven/" }
    jcenter()
    maven { url "https://nexus.abc.com:8443/content/repositories/jcenter/" }
}

group = 'com.abc.cscm'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '8'
targetCompatibility = '8'

ext {
    springCloudVersion = 'Hoxton.SR6'
    confluentVersion = '5.4.0'
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.kafka:spring-kafka'

    implementation "io.confluent:kafka-avro-serializer:${confluentVersion}"

    implementation 'org.apache.avro:avro:1.9.1'

    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
    testImplementation 'org.springframework.kafka:spring-kafka-test'
}

springBoot {
    buildInfo()
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

test {
    useJUnitPlatform()
}

wrapper {
    distributionUrl = "https://nexus.abc.com:8443/service/local/repositories/thirdparty/content/org/gradle/gradle/6.5/gradle-6.5.zip"
}

apply plugin: "com.commercehub.gradle.plugin.avro"
apply plugin: 'idea'

./gradlew dependencies --configuration compileClasspath (Output)

> Task :dependencies

------------------------------------------------------------
Root project
------------------------------------------------------------

compileClasspath - Compile classpath for source set 'main'.
                        ** omiting spring deps
+--- io.confluent:kafka-avro-serializer:5.4.0
|    +--- org.apache.avro:avro:1.9.1
|    |    +--- com.fasterxml.jackson.core:jackson-core:2.9.9 -> 2.11.0
|    |    +--- com.fasterxml.jackson.core:jackson-databind:2.9.9.3 -> 2.11.0 (*)
|    |    +--- org.apache.commons:commons-compress:1.19
|    |    \--- org.slf4j:slf4j-api:1.7.25 -> 1.7.30
|    +--- io.confluent:kafka-schema-registry-client:5.4.0
|    |    +--- org.apache.kafka:kafka-clients:5.4.0-ccs -> 2.5.0 (*)
|    |    +--- io.confluent:common-config:5.4.0
|    |    |    +--- io.confluent:common-utils:5.4.0
|    |    |    |    \--- org.slf4j:slf4j-api:1.7.26 -> 1.7.30
|    |    |    \--- org.slf4j:slf4j-api:1.7.26 -> 1.7.30
|    |    +--- org.apache.avro:avro:1.9.1 (*)
|    |    +--- com.fasterxml.jackson.core:jackson-databind:2.9.10.1 -> 2.11.0 (*)
|    |    +--- io.swagger:swagger-annotations:1.5.22
|    |    +--- io.swagger:swagger-core:1.5.3
|    |    |    +--- org.apache.commons:commons-lang3:3.2.1 -> 3.10
|    |    |    +--- org.slf4j:slf4j-api:1.6.3 -> 1.7.30
|    |    |    +--- com.fasterxml.jackson.core:jackson-annotations:2.4.5 -> 2.11.0
|    |    |    +--- com.fasterxml.jackson.core:jackson-databind:2.4.5 -> 2.11.0 (*)
|    |    |    +--- com.fasterxml.jackson.datatype:jackson-datatype-joda:2.4.5 -> 2.11.0
|    |    |    |    +--- com.fasterxml.jackson.core:jackson-core:2.11.0
|    |    |    |    \--- joda-time:joda-time:2.9.9
|    |    |    +--- com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.4.5 -> 2.11.0
|    |    |    |    +--- com.fasterxml.jackson.core:jackson-databind:2.11.0 (*)
|    |    |    |    +--- org.yaml:snakeyaml:1.26
|    |    |    |    \--- com.fasterxml.jackson.core:jackson-core:2.11.0
|    |    |    +--- io.swagger:swagger-models:1.5.3
|    |    |    |    +--- com.fasterxml.jackson.core:jackson-annotations:2.4.5 -> 2.11.0
|    |    |    |    +--- org.slf4j:slf4j-api:1.6.3 -> 1.7.30
|    |    |    |    \--- io.swagger:swagger-annotations:1.5.3 -> 1.5.22
|    |    |    \--- com.google.guava:guava:18.0 -> 29.0-android
|    |    |         +--- com.google.guava:failureaccess:1.0.1
|    |    |         +--- com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava
|    |    |         +--- com.google.code.findbugs:jsr305:3.0.2
|    |    |         +--- org.checkerframework:checker-compat-qual:2.5.5
|    |    |         +--- com.google.errorprone:error_prone_annotations:2.3.4
|    |    |         \--- com.google.j2objc:j2objc-annotations:1.3
|    |    \--- io.confluent:common-utils:5.4.0 (*)
|    +--- io.confluent:common-config:5.4.0 (*)
|    \--- io.confluent:common-utils:5.4.0 (*)
\--- org.apache.avro:avro:1.9.1 (*)

./gradlew buildEnvironment (Output)

classpath
+--- com.commercehub.gradle.plugin:gradle-avro-plugin:0.20.0
|    \--- org.apache.avro:avro-compiler:1.9.2    <<<<<<<<<<<<<<<<<<<<<<<<<<
|         +--- org.apache.avro:avro:1.9.2
|         |    +--- com.fasterxml.jackson.core:jackson-core:2.10.2 -> 2.11.0
|         |    +--- com.fasterxml.jackson.core:jackson-databind:2.10.2 -> 2.11.0
|         |    |    +--- com.fasterxml.jackson.core:jackson-annotations:2.11.0
|         |    |    \--- com.fasterxml.jackson.core:jackson-core:2.11.0
|         |    +--- org.apache.commons:commons-compress:1.19
|         |    \--- org.slf4j:slf4j-api:1.7.25 -> 1.7.30
|         +--- org.apache.commons:commons-lang3:3.9 -> 3.10
|         +--- org.apache.velocity:velocity-engine-core:2.2
|         |    +--- org.apache.commons:commons-lang3:3.9 -> 3.10
|         |    \--- org.slf4j:slf4j-api:1.7.30
|         +--- com.fasterxml.jackson.core:jackson-databind:2.10.2 -> 2.11.0 (*)
|         +--- joda-time:joda-time:2.10.1
|         \--- org.slf4j:slf4j-api:1.7.25 -> 1.7.30

I am not sure if I should be editing my generated POJO class or I am missing something.

I was able to convert avro message into POJO by changing the schema as mentioned in below question. But I think it is hacky and problem is not yet solved.

Question - Avro is not able to deserialize Union with Logical Types in fields

1
Make sure your serializer and deserializer are using same date type objectSaurabh Nigam
Yes, same date type object is used for serialization and deserialization.abb

1 Answers

0
votes

Could you please clarify what version of avro-maven-plugin are you using to generate the POJO ? As of avro version 1.9.0, Joda-Time has been deprecated in favor of Java8 JSR310 and Java8 was set as default. See Apache Avro 1.9.0 release notes.

When I generate a POJO from scratch, I get java.time.LocalDate BIRTH_DT and not org.joda.time.LocalDate BIRTH_DT.

   @Deprecated public java.time.LocalDate BIRTH_DT;

So, in your case I think there is most likely an avro versions mismatch inside the classpath or a stale pojo. I would recommend verifying the avro version via mvn dependency:tree -Dincludes=org.apache.avro:avro call and regenerating POJO.