0
votes

I am trying to calculate the average for input datastream (no windows) in Flink

I have used a mapper to change the stream from (key, value) to (key, value, 1)

Now I need to sum on both 2nd and 3rd field and divide them by each other.

Input data stream is from socket connection in form of 'KEY VALUE' like 'X 5'

public class AvgViews {

DataStream<Tuple2<String, Double>> AvgViewStream = dataStream
                .map(new AvgViews.RowSplitter())
                .keyBy(0)
                //.??? 



    public static class RowSplitter implements
            MapFunction<String, Tuple3<String, Double, Integer>> {

        public Tuple3<String, Double, Integer> map(String row)
                throws Exception {
            String[] fields = row.split(" ");
            if (fields.length == 2) {
                return new Tuple3<String, Double, Integer>(
                        fields[0],
                        Double.parseDouble(fields[1]),
                        1);
            }
            return null;
        }
    }
}
1

1 Answers

0
votes

You can use a RichMap (or RichFlatMap) that keeps a Tuple2 in keyed state. You'll want to add each incoming record to the state, and emit the average as the output.

The CountWindowAverage example in the docs does something similar, though a bit more complex.