I want to read streaming data from kafka topics and write into S3 in avro, or parquet, format. The datastream looks like json string but I am not able to convert and write into S3 in avro, or parquet, format.
I found some code snippets and tried
val sink = StreamingFileSink .forBulkFormat(new Path(outputS3Path), ParquetAvroWriters.forReflectRecord(classOf[myClass])) .build()
But I got "Type mismatch, expected SinkFunction[String], actual: StreamingFileSink[TextOut]" at the addSink
val stream = env .addSource(myConsumerSource) .addSink(sink)
Please help, thanks!
