0
votes

I want to create keyed windows in Apache flink such that the windows for each key gets executed n minutes after arrival of first event for the key. Is it possible to be done using Event time characteristics ( as processing time depends on system clock and it is uncertain when will the first event arrives ). If it is possible please explain the assignment of Event time and watermark also to the events and also explain how to call the process window function after n minutes.

Below is a part of code which can give you an idea about what i am doing currently :

            //Make keyed events so as to start a window for a key
            KeyedStream<SourceData, Tuple> keyedEvents = 
                    env.addSource(new MySource(configData),"JSON Source")
                    .assignTimestampsAndWatermarks(new MyTimeStamps())
                    .setParallelism(1)
                    .keyBy("service");


            //Start a window for windowTime time
            DataStream<ResultData> resultData=
                    keyedEvents
                    .timeWindow(Time.minutes(winTime))
                    .process(new ProcessEventWindow(configData))
                    .name("Event Collection Window")
                    .setParallelism(25);

So, how would i assign the Event time and wateramark such that the window follow the event time of first event as starting point and executes after 10 minutes ( start time of first event can be different for different keys ). Any help would be really appreciated.

        /------------ ( window of 10 minutes )
Streams          |------------ ( window of 10 minutes )
            \------------ ( window of 10 minutes )

Edit : Class i used for assigning timestamp and watermarks

public class MyTimeStamps implements AssignerWithPeriodicWatermarks<SourceData> {

    @Override
    public long extractTimestamp(SourceData element, long previousElementTimestamp) {

          //Will return epoch of currentTime
        return GlobalUtilities.getCurrentEpoch();
    }

    @Override
    public Watermark getCurrentWatermark() {
        // TODO Auto-generated method stub
        //Will return epoch of currentTime + 10 minutes
        return new Watermark(GlobalUtilities.getTimeShiftNMinutesEpoch(10));
    }

}
2

2 Answers

1
votes

I think for your use case it would be best to use the ProcessFunction. What you could do is register an EventTimeTimer when the first event comes. Than in the onTimer method emit the results.

Something like:

public class ProcessFunctionImpl extends ProcessFunction<SourceData, ResultData> {

    @Override
    public void processElement(SourceData value, Context ctx, Collector<ResultData> out)
        throws Exception {

        // retrieve the current aggregate
        ResultData current = state.value();
        if (current == null) {
            // first event arrived
            current = new ResultData();
            // register end of window
            ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 60 * 1000 /* 10 minutes */);
        }

        // update the state's aggregate
        current += value;

        // write the state back
        state.update(current);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultData> out)
        throws Exception {

        // get the state for the key that scheduled the timer
        ResultData result = state.value();

        out.collect(result);

        // reset the window state
        state.clear();
    }
}
0
votes

I had a similar question a while ago in regard to event time windows. Here's what my stream looks like

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//Consumer Setup

val stream = env.addSource(consumer)
  .assignTimestampsAndWatermarks(new WMAssigner)

// Additional Setup here

stream
  .keyBy { data => data.findValue("service") }
  .window(TumblingEventTimeWindows.of(Time.minutes(10)))
  .process { new WindowProcessor }

  //Sinks go here

My WMAssigner class looked like this(Note: This allowed for 1 minute of out of order events to happen, you can extend a different Timestamp extractor if you don't want to allow for lateness):

class WMAssigner extends BoundedOutOfOrdernessTimestampExtractor[ObjectNode] (Time.seconds(60)) {
  override def extractTimestamp(element: ObjectNode): Long = {
    val tsStr = element.findValue("data").findValue("ts").toString replaceAll("\"", "")
    tsStr.toLong
  }
}

My timestamp I wanted to use for Watermarks was data.ts field.

My WindowProcessor:

class WindowProcessor extends ProcessWindowFunction[ObjectNode,String,String,TimeWindow] {
  override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[String]): Unit = {
    val out = ""
    elements.foreach( value => {
      out = value.findValue("data").findValue("outData")
    }
    out.collect(out)
  }
}

Let me know if anything is unclear