I have a use case and I think I need some help on how to approach it. Because I am new to streaming and Flink I will try to be very descriptive in what I am trying to achieve. Sorry if I am not using to formal and correct language.
My code will be in java but I do not care to get code in python or just pseudo code or approach.
TL:DR
- Group events of same key that are within some time limit.
- Out of those events, create a result event only from the 2 most closest (time domain) events.
- This require (I think) opening a window for each and every event that comes.
- If you'll look ahead at the batch solution you will understand best my problem.
Background:
- I have data coming from sensors as a stream from Kafka.
- I need to use eventTime because that data comes unrecorded. The lateness that will give me 90% of events is about 1 minute.
- I am grouping those events by some key.
What I want to do:
- Depending on some event's fields - I would like to "join/mix" 2 events into a new event ("result event").
- The first condition is that those consecutive events are WITHIN 30 seconds from each other.
- The next conditions are simply checking some fields values and than deciding.
My psuedo solution:
- open a new window for EACH event. That window should be of 1 minute.
- For every event that comes within that minute - I want to check it's event time and see if it is 30 seconds from the initial window event. If yes - check for other condition and omit a new result stream.
The Problem - When a new event comes it needs to:
- create a new window for itself.
- Join only ONE window out of SEVERAL possible windows that are 30 seconds from it.
The question:
Is that possible?
In other words my connection is between two "consecutive" events only.
Thank you very much.
Maybe showing the solution for **BATCH case will show what I am trying to do best:**
for i in range(grouped_events.length):
event_A = grouped_events[i]
event_B = grouped_events[i+1]
if event_B.get("time") - event_A.get("time") < 30:
if event_B.get("color") == event_A.get("color"):
if event_B.get("size") > event_A.get("size"):
create_result_event(event_A, event_B)
My (naive) tries so far with Flink in java
**The sum function is just a place holder for my function to create a new result object...
- First solution is just doing a simple time window and summing by some field
Second is trying to do some process function on the window and maybe there iterate throw all events and check for my conditions?
DataStream .keyBy(threeEvent -> threeEvent.getUserId()) .window(TumblingEventTimeWindows.of(Time.seconds(60))) .sum("size") .print(); DataStream .keyBy(threeEvent -> threeEvent.getUserId()) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new processFunction()); public static class processFunction extends ProcessWindowFunction<ThreeEvent, Tuple3<Long, Long, Float>, Long, TimeWindow> { @Override public void process(Long key, Context context, Iterable<ThreeEvent> threeEvents, Collector<Tuple3<Long, Long, Float>> out) throws Exception { Float sumOfSize = 0F; for (ThreeEvent f : threeEvents) { sumOfSize += f.getSize(); } out.collect(new Tuple3<>(context.window().getEnd(), key, sumOfTips)); } }