4
votes

I want to make a time window of streaming data in Apache Flink. My data looks somewhat like this:

1> {52,"mokshda",84.85}
2> {1,"kavita",26.16}
2> {131,"nidhi",178.9}
3> {2,"poorvi",22.97}
4> {115,"saheba",110.41}

Every 20 seconds, I want the sum of the marks(last column, eg. Mokshda's marks are 84.85) of all the rows. The timeWindow() function operates on a KeyedStream and thus I have to keyBy() this DataStream. I can key it by roll number(the first column, eg. 52 for Mokshda).

val windowedStream = stockStream
                        .keyBy(0)
                        .timeWindow(Time.seconds(20))
                        .sum(2)

But obviously, Flink is not reading my data as a list. It is reading it as a String and thus, I get the following exception:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types. Type: String

How can I perform a timeWindow on String data, or how can I convert this data into Tuple?

1

1 Answers

5
votes

You can convert a DataStream[String] into a DataStream[(Int, String, Double)] using a MapFunction[String, (Int, String, Double)] which parses a String into its components, converts the data types and emits a Tuple.

You can also apply a timeWindowAll on a non-keyed data stream. However, the semantics are of course different and an AllWindow can only be processed with parallelism 1.