FLINK Streaming: I have DataStream[String] from kafkaconsumer which is JSON
stream = env
.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
I have to sink this stream using StreamingFileSink, which needs DataStream[GenericRecord]
val schema: Schema = ...
val input: DataStream[GenericRecord] = ...
val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
.forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
.build()
input.addSink(sink)
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html
Question: How to convert DataStream[String] to DataStream[GenericRecord] before Sinking so that I can write AVRO files ?
Exception while converting String stream to generic data strem
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
at org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:791)
at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1168)
at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:617)
at com.att.vdcs.StreamingJobKafkaFlink$.main(StreamingJobKafkaFlink.scala:128)
at com.att.vdcs.StreamingJobKafkaFlink.main(StreamingJobKafkaFlink.scala)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406)
... 7 more
After initializing schema in mapper, Getting cast exception.
org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: scala.Tuple2 cannot be cast to java.util.Map
schema and msg below in screen:

Got through CAST Exception by casting like:
record.put(0,scala.collection.JavaConverters.mapAsJavaMapConverter(msg._1).asJava)
Now streaming is working good Except there are extra Escape Characters added
,"body":"\"{\\\"hdr\\\":{\\\"mes
there are extra escape \
it should be like:
,"body":"\"{\"hdr\":{\"mes
extra escape was removed after changing toString to getAsString
Now its working as expected.
Need to try SNAPPY compression of stream next.