1
votes

I'd want to read csv file using by Flink, Scala-language and addSource- and readCsvFile-functions. I have not found any simple examples about that. I have only found: https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/scala/com/dataartisans/flinktraining/exercises/datastream_scala/cep/LongRides.scala and this too complex for my purpose.

In definition: StreamExecutionEnvironment.addSource(sourceFunction) should i only use readCsvFile as sourceFunction ?

After reading i'd want to use CEP (Complex Event Processing).

2
Yes, but i did not find combination addSource() with readCsvFile() .. or even readTextFile() and readFile(). It is possible ? because of: ci.apache.org/projects/flink/flink-docs-release-1.4/dev/…Eras Rasmuson

2 Answers

1
votes

readCsvFile() is only available as part of Flink's DataSet (batch) API, and cannot be used with the DataStream (streaming) API. Here's a pretty good example of readCsvFile(), though it's probably not relevant to what you're trying to do.

readTextFile() and readFile() are methods on StreamExecutionEnvironment, and do not implement the SourceFunction interface -- they are not meant to be used with addSource(), but rather instead of it. Here's an example of using readTextFile() to load a CSV using the DataStream API.

Another option is to use the Table API, and a CsvTableSource. Here's an example and some discussion of what it does and doesn't do. If you go this route, you'll need to use StreamTableEnvironment.toAppendStream() to convert your table stream to a DataStream before using CEP.

Keep in mind that all of these approaches will simply read the file once and create a bounded stream from its contents. If you want a source that reads in an unbounded CSV stream, and waits for new rows to be appended, you'll need a different approach. You could use a custom source, or a socketTextStream, or something like Kafka.

0
votes

If you have a CSV file with 3 fields - String,Long,Integer

then do below

val input=benv.readCsvFile[(String,Long,Integer)]("hdfs:///path/to/your_csv_file")

PS:-I am using flink shell that is why I have benv