I am very beginner with Scala and Apache Flink, but up to this point, everything worked out for me. I am trying to consume Kafka events serialized to AVRO from my Flink application. I read the documentation (https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema) and googled many hours, but I am still on the same page.
I have a case class case class URLResponse(status: int, domain: String, url: String, queue: String, html: String)
and a schema val schema: Schema = new Schema.Parser().parse("{\"type\": \"record\", \"name\": \"URLResponse\",\"fields\": [{\"name\": \"status\", \"type\": \"long\"}, {\"name\": \"domain\", \"type\": \"string\"}, {\"name\": \"url\", \"type\": \"string\"}, {\"name\": \"queue\", \"type\": \"string\"}, {\"name\": \"html\", \"type\": \"string\"}]}")
. I tried 3 approaches:
val stream = env.addSource(new FlinkKafkaConsumer(kafkaTopic, AvroDeserializationSchema.forGeneric(schema), properties))
an error occurs at run time:
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
val stream = env.addSource(new FlinkKafkaConsumer[URLResponse](kafkaTopic, AvroDeserializationSchema.forSpecific(classOf[URLResponse]), properties))
build fails
inferred type arguments [schemas.URLResponse] do not conform to method forSpecific's type parameter bounds [T <: org.apache.avro.specific.SpecificRecord]
kafkaTopic, AvroDeserializationSchema.forSpecific(classOf[URLResponse]), properties))
val stream = env.addSource(new FlinkKafkaConsumer[URLResponse](kafkaTopic, new AvroDeserializationSchema[URLResponse](classOf[URLResponse]), properties))
build fails
constructor AvroDeserializationSchema in class AvroDeserializationSchema cannot be accessed in object MyApp
val des: AvroDeserializationSchema[URLResponse] = new AvroDeserializationSchema[URLResponse](classOf[URLResponse])
Please, help! What's the preferred approach? And why it doesn't work? Thanks!