2
votes

I need to enrich my fast changing streamA keyed by (userId, startTripTimestamp) with slowly changing streamB keyed by (userId).

I use Flink 1.8 with DataStream API. I consider 2 approaches:

  1. Broadcast streamB and join stream by userId and most recent timestamp. Would it be equivalent of DynamicTable from the TableAPI? I can see some downsides of this solution: streamB needs to fit into RAM of each worker node, it increase utilization of RAM as whole streamB needs to be stored in RAM of each worker.

  2. Generalise state of streamA to a stream keyed by just (userId), let's name it streamC, to have common key with the streamB. Then I am able to union streamC with streamB, order by processing time, and handle both types of events in state. It's more complex to handle generaised stream (more code in the process function), but not consume that much RAM to have all streamB on all nodes. Are they any more downsides or upsides of this solution?

I have also seen this proposal https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API where it is said:

In general, most of these follow the pattern of joining a main stream of high throughput with one or several inputs of slowly changing or static data:

[...]

Join stream with slowly evolving data: This is very similar to the above case but the side input that we use for enriching is evolving over time. This can be done by waiting for some initial data to be available before processing the main input and the continuously ingesting new data into the internal side input structure as it arrives.

Unfortunately, it looks like a long time ahead to reach this feature https://issues.apache.org/jira/browse/FLINK-6131 and no alternatives are described. Therefore I would like to ask of the currently recommended approach for the described use case.

I've seen Combining low-latency streams with multiple meta-data streams in Flink (enrichment), but it not specify what are keys of that streams, and moreover it is answered at the time of Flink 1.4, so I expect the recommended solution might have changed.

2

2 Answers

3
votes

Building on top of what Gaurav Kumar has already answered.

The main question is do you need to exactly match records from streamA and streamB or is it best effort match? For example, is it an issue for you, that because of a race condition some (a lot of?) records from streamA can be processed before some updates from streamB arrive, for example during the start up?

I would suggest to draw an inspiration from how Table API is solving this issue. Probably Temporal Table Join is the right choice for you, which would leave you with the choice: processing time or event time?

Both of the Gaurav Kumar's proposal are implementations of processing time Temporal Table joins, which assumes that records can be very loosely joined and do not have to timed properly.

If records from streamA and streamB have to be timed properly, then one way or another you have to buffer some of the records from both of the streams. There are various of ways how to do it, depending on what semantic you want to achieve. After deciding on that, the actual implementation is not that difficult and you can draw an inspiration from Table API join operators (org.apache.flink.table.runtime.join package in flink-table-planner module).

Side inputs (that you referenced) and/or input selection are just tools for controlling the amount of unnecessary buffered records. You can implement a valid Flink job without them, but the memory consumption can be hard to control if one stream significantly overtakes the other (in terms of event time - for processing time it's non-issue).

1
votes

The answer depends on size of your state of streamB that needs to be used to enrich streamA

  • If you broadcast your streamB state, then you are putting all userIDs from streamB to each of the task managers. Each task on task manager will only have a subset of these userIds from streamA on it. So some userId data from streamB will never be used and will stay as a waste. So if you think that the size of streamB state is not big enough to really impact your job and doesn't take significant memory to leave less memory for state management, you can keep the whole streamB state. This is your #1.
  • If your streamB state is really huge and can consume considerable memory on task managers, you should consider approach #2. KeyBy same Id both the streams to make sure that elements with same userID reach the same tasks, and then you can use managed state to maintain the per key streamB state and enrich streamA elements using this managed state.