0
votes

I have a Flink streaming system where I get click stream data.

data format:

{"uid":"123", "event_type":"view","payload":{"p1":{"price":23}}}
{"uid":"123", "event_type":"view","payload":{"p2":{"price":25}}}
{"uid":"123", "event_type":"a2c","payload":{"p2"}}
{"uid":"123", "event_type":"a2c","payload":{"p1":{}}}

So, here as we see we get two types of events a2c and view. The difference between these two types of events is in view events price is there for all products and in a2c only name is there, price is not there.

What I want to achieve is create a consolidated payload out of all the events for a 10 mins window. Also enrich the payloads of a2c by getting price information from its corresponding view event.

consolidated payload once window finishes for each uid:

{
    "uid":"123",
    "all":[
        {"event_type":"view", "payload":{"p1":{"price":23}}},
        {"event_type":"view","payload":{"p2":{"price":25}}},
        {"event_type":"a2c","payload":{"p2":{"price":25}}},
        {"event_type":"a2c","payload":{"p1":{"price":23}}}
    ],
   "total":4
}

How can I achieve this ? Basically I have to maintain a state of all the view events in the window and then once I get the a2c event I have to fetch the price from the state. I am not asking for any working solution, just how to maintain the state of all the view events in the window. Also I have some custom Reduce operations.

events.keyBy("uid").window(..).reduce(new ReduceCustomFun(..)).uid("..").name("..");

in the ReduceCustomFun: I am joining 2 events data into the list.

1

1 Answers

0
votes

The easiest thing to do would be to use a ProcessWindowFunction that does all of the window processing at the end of each 10 minute window. Then you would have an Iterable containing all of the events (for a given uid) from which to create the consolidated report, and you wouldn't have to concern yourself with maintaining any state.