1
votes

I have a DataStream of Kafka events (corresponding to readings from a device) fed into the following code which produces a per device average of the reading in a sliding window. This works fine. Next i want to calculate the sum of all the per device average in the same window, this is the part i am not able to syntactically express correctly.

This part works:

val stream = env
    // words is our Kafka topic
    .addSource(kafkaConsumer)
    // configure timestamp and watermark assigner
    .assignTimestampsAndWatermarks(new DeviceTSAssigner)
      .keyBy(_.deviceIdFull)
      .timeWindow(Time.minutes(5), Time.minutes(1))
    /* count events in window */
      .apply{ (key: String, window: TimeWindow, events: Iterable[DeviceData], out: Collector[(String, Long, Double)]) =>
        out.collect( (key, window.getEnd, events.map(_.currentReading).sum/events.size))
    }

  stream.print()

The output is something like

(device1,1530681420000,0.0)
(device2,1530681420000,0.0)
(device3,1530681480000,0.0)
(device4,1530681480000,0.0)
(device5,1530681480000,52066.0)
(device6,1530681480000,69039.0)
(device7,1530681480000,79939.0)
... 
...

The following code is the part i am having problems with, i am not sure exactly how to code this but i am thinking it should be something like this:

  val avgStream = stream
    .keyBy(2) // 2 represents the window.End from stream, see code above
    .timeWindow(Time.minutes(1)) // tumbling window
    .apply { (
               key: Long,
               window: TimeWindow,
               events: Iterable[(String, Long, Double)],
               out: Collector[(Long, Double)]) =>
      out.collect( (key, events.map( _._3 ).sum ))
    }

I get the following errors while compiling this code..

Error:(70, 52) type mismatch;
 found   : (Long, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[(String, Long, Double)], org.apache.flink.util.Collector[(Long, Double)]) => Unit
 required: (org.apache.flink.api.java.tuple.Tuple, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[(String, Long, Double)], org.apache.flink.util.Collector[?]) => Unit
                   out: Collector[(Long, Double)]) =>

I tried other variants as well like use AggregtionFunctions, but could not get past compilation. Seems from the error that i need to convert the input stream elements into tuples, i have seen some code but not exactly sure how to do that. I am completely new at Scala so i am thinking that is the main problem here because what i want to do is not complex.

Updated 07/04/2018

I think i have a solution for my question, seems to work ok, but i still would like to keep this open hoping someone else can comment on it (the question as well as my solution).

Basically, i removed the first field (by doing a map) which was the name of the device since we dont need it and then a keyBy on the timestamp (from the previous stage), windowed events into a tumbling window and then just sum on the 2nd (index 1, 0 based) field which is the average reading from the previous stage.

val avgStream = stream
          .map(r => (r._2, r._3))
         .keyBy(0)
         .timeWindowAll(Time.minutes(1))
         .sum(1)
         .print()
1
I think you mean .keyBy(1) if you want to use the second field for your records as key. Indices start at zero.0x26res
The mapped stream has only two fields i.e. (timestamp, per device avg reading) so to keyBy the first field the index would be 0.Amit Arora

1 Answers

1
votes

I was able to answer my own question, so the approach described above (see under update 07/04/2018) works but the better way to do this (especially if you want to do this for not just one field in the stream but multiple) is to use an AggregateFunction. I was trying that earlier as well but ran into problems because the "map" step was missing.

Once i mapped the stream in the second stage to extract out relevant fields of interest i could use the AggregateFunction.

The Flink documentation here and this github link both provide an example for this. I started with the Flink documentation example because it was very simple to understand and then converted my code to look more like the github example.