1
votes

I have a question around joining two streams on Flink. I use two different dataflows that at some point I need to join them. Each dataflow has been tagged with a unique id which serves as the joining point between these flows. There is no concept of window so in order to connect these two dataflows I do first.connect(second).keyBy(0,0).

This seems to work as I get the correct results, but my worries are on long term. I do not explicitly keep any state on the operator(coFlatMap) that does the join but what happens if let's say one flow (e.g first) provides the unique id and the second fails to provide the joining id (I suppose for those already joined the operator discards any kind of internal state) ? Does the memory/state footprint grows constantly or there is some kind of expiration mechanism ?

And if this is the case how can I tackle this problem ? or alternatively can you suggest me another approach ?

1

1 Answers

0
votes

There are a few approaches to implement this join.

  1. Use a CoProcessFunction. When the first record for a key arrives, you store it in state and register a timer that fires x minutes/hours/days later. When the second record arrives, you perform the join and clear the state. If the second record does not arrive, the onTimer() method will be called when the timer fires. At that point, you can either just clear the state and return (INNER JOIN semantics) or forward the first record padded with a null value (OUTER JOIN semantics), clear the state, and return. The timer serves as a safety net to be able to remove the state at some point. It depends on your requirements how long you want to wait for the second record to arrive.

  2. The Table API or SQL provides a time-windowed join (Table API, SQL) that works similar to what I described in 1. The difference is that the windowed join implementation would try to join all records (i.e., more than one from each input stream) that arrive during the join interval and hence would keep the state longer. However, once the time is past the join interval, it would clear the state.

  3. Flink 1.6.0 (to be released early August 2018) will include an interval join for the DataStream API which works similar to the window join of the Table API (similar logic, different name). It would also keep the state longer than the custom implementation which is based on the assumption that each key appears just once on each side.

I would go for approach 1. because it is more memory efficient and still reasonably easy to implement.