0
votes

I am very beginner with Scala and Apache Flink, but up to this point, everything worked out for me. I am trying to consume Kafka events serialized to AVRO from my Flink application. I read the documentation (https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema) and googled many hours, but I am still on the same page. I have a case class case class URLResponse(status: int, domain: String, url: String, queue: String, html: String) and a schema val schema: Schema = new Schema.Parser().parse("{\"type\": \"record\", \"name\": \"URLResponse\",\"fields\": [{\"name\": \"status\", \"type\": \"long\"}, {\"name\": \"domain\", \"type\": \"string\"}, {\"name\": \"url\", \"type\": \"string\"}, {\"name\": \"queue\", \"type\": \"string\"}, {\"name\": \"html\", \"type\": \"string\"}]}"). I tried 3 approaches:

  1. val stream = env.addSource(new FlinkKafkaConsumer(kafkaTopic, AvroDeserializationSchema.forGeneric(schema), properties)) an error occurs at run time:
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
  1. val stream = env.addSource(new FlinkKafkaConsumer[URLResponse](kafkaTopic, AvroDeserializationSchema.forSpecific(classOf[URLResponse]), properties)) build fails
inferred type arguments [schemas.URLResponse] do not conform to method forSpecific's type parameter bounds [T <: org.apache.avro.specific.SpecificRecord]
      kafkaTopic, AvroDeserializationSchema.forSpecific(classOf[URLResponse]), properties))
  1. val stream = env.addSource(new FlinkKafkaConsumer[URLResponse](kafkaTopic, new AvroDeserializationSchema[URLResponse](classOf[URLResponse]), properties)) build fails
constructor AvroDeserializationSchema in class AvroDeserializationSchema cannot be accessed in object MyApp
    val des: AvroDeserializationSchema[URLResponse] = new AvroDeserializationSchema[URLResponse](classOf[URLResponse])

Please, help! What's the preferred approach? And why it doesn't work? Thanks!

1

1 Answers

0
votes

It seems like the first approach is recommended. And the mentioned exception is related to the issue with the scala implementation of the avro deserialization. It works fine if I use the java implementation (https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro). My solution:

    val javaStream = env.getJavaEnv.addSource(new FlinkKafkaConsumer[GenericRecord](
      kafkaTopic, ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, schemaRegistryURL), properties),
      new GenericRecordAvroTypeInfo(schema))
    val stream = new DataStream[GenericRecord](javaStream)