import spark.implicits._ val ds1 = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "test") .option("startingOffsets", "latest") .load() .as[KafkaMessage] .select($"value".as[Array[Byte]]) .map(msg=>{
val byteArrayInputStream = new ByteArrayInputStream(msg)
val datumReader:DatumReader[GenericRecord] = new SpecificDatumReader[GenericRecord](messageSchema)
val dataFileReader:DataFileStream[GenericRecord] = new DataFileStream[GenericRecord](byteArrayInputStream, datumReader)
while(dataFileReader.hasNext) {
val userData1: GenericRecord = dataFileReader.next()
userData1.asInstanceOf[org.apache.avro.util.Utf8].toString
}
})
Error: Error:(49, 9) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. .map(msg=>{