2
votes

I am just starting out with Apache Flink using Scala. Can someone please tell me how to create a lagged stream(lagged by k events or k units of time) from a current datastream that I have?

Basically, I want to implement an auto regression model (Linear regression on the stream with the time lagged version of itself) on a data-stream. So, a method is needed something similar to the following pseudo code.

val ds : DataStream = ...

val laggedDS : DataStream = ds.map(lag _)

def lag(ds : DataStream, k : Time) : DataStream = {

}

I expect the sample input and output like this if every event is spaced at 1 second interval and there is a 2 second lag.

Input : 1, 2, 3, 4, 5, 6, 7...
Output: NA, NA, 1, 2, 3, 4, 5...

1
Can you extend your question and explain what you mean by lagged stream? Thanks - Fabian Hueske
@FabianHueske, I think with lagged datastream, he means that getting elements in the datasteam late than usual. For example, a 1 minute lag will emit elements 1 minute later than when they arrived in the stream. - Piyush Shrivastava
The question says "lagged by k events" and not "lagged by x minutes". One interpretation would be append new events in a FIFO queue of k events and forward the queue head element when a new event arrives. Without clear definition of the desired semantics the question cannot be answered. - Fabian Hueske
@FabianHueske If the stream has an inconsistent frequency, I would like to lag it by counts and not by time. Also edited the question. Hope that helps. :) - Kauchy
Although I would very much like to know the time variant too. :) - Kauchy

1 Answers

5
votes

Given that I your requirements right, I would implement this as a FlatMapFunction with a FIFO queue. The queue buffers k events and emits the head whenever a new event arrives. In case you need a fault tolerant streaming application, the queue must be registered as state. Flink will then take care of checkpointing the state (i.e., the queue) and restore it in case of a failure.

The FlatMapFunction could look like this:

class Lagger(val k: Int) 
    extends FlatMapFunction[X, X] 
    with Checkpointed[mutable.Queue[X]] 
{

  var fifo: mutable.Queue[X] = new mutable.Queue[X]()

  override def flatMap(value: X, out: Collector[X]): Unit = {
    // add new element to queue
    fifo.enqueue(value)
    if (fifo.size == k + 1) {
      // remove head element and emit
      out.collect(fifo.dequeue())
    }
  }

  // restore state
  override def restoreState(state: mutable.Queue[X]) = { fifo = state }

  // get state to checkpoint
  override def snapshotState(cId: Long, cTS: Long): mutable.Queue[X] = fifo

}

Returning elements with a time lag is more involved. This would require timer threads for the emission because the function is only called when a new element arrives.