1
votes

I want to read streaming data from kafka topics and write into S3 in avro, or parquet, format. The datastream looks like json string but I am not able to convert and write into S3 in avro, or parquet, format.

I found some code snippets and tried

val sink = StreamingFileSink .forBulkFormat(new Path(outputS3Path), ParquetAvroWriters.forReflectRecord(classOf[myClass])) .build()

But I got "Type mismatch, expected SinkFunction[String], actual: StreamingFileSink[TextOut]" at the addSink

val stream = env .addSource(myConsumerSource) .addSink(sink)

Please help, thanks!

2

2 Answers

0
votes

This is the code I have that is working to store Parquet files into local system.

import org.apache.avro.generic.GenericRecord
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.flink.core.fs.Path
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(100)
val schema = SchemaBuilder
  .record("record")
  .fields()
  .requiredString("message")
  .endRecord()

val stream: DataStreamSource[GenericRecord] = env.fromCollection(genericRecordList)
val path = new Path(s"/tmp/flink-parquet-${System.currentTimeMillis()}")
val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
  .forBulkFormat(path, ParquetAvroWriters.forGenericRecord(schema))
  .build()

stream.addSink(sink)
env.execute()