I have a Flink streaming system where I get click stream data.
data format:
{"uid":"123", "event_type":"view","payload":{"p1":{"price":23}}}
{"uid":"123", "event_type":"view","payload":{"p2":{"price":25}}}
{"uid":"123", "event_type":"a2c","payload":{"p2"}}
{"uid":"123", "event_type":"a2c","payload":{"p1":{}}}
So, here as we see we get two types of events a2c and view. The difference between these two types of events is in view events price is there for all products and in a2c only name is there, price is not there.
What I want to achieve is create a consolidated payload out of all the events for a 10 mins window. Also enrich the payloads of a2c by getting price information from its corresponding view event.
consolidated payload once window finishes for each uid:
{
"uid":"123",
"all":[
{"event_type":"view", "payload":{"p1":{"price":23}}},
{"event_type":"view","payload":{"p2":{"price":25}}},
{"event_type":"a2c","payload":{"p2":{"price":25}}},
{"event_type":"a2c","payload":{"p1":{"price":23}}}
],
"total":4
}
How can I achieve this ?
Basically I have to maintain a state of all the view events in the window and then once I get the a2c event I have to fetch the price from the state.
I am not asking for any working solution, just how to maintain the state of all the view events in the window.
Also I have some custom Reduce operations.
events.keyBy("uid").window(..).reduce(new ReduceCustomFun(..)).uid("..").name("..");
in the ReduceCustomFun: I am joining 2 events data into the list.