0
votes

Today, I'd like to address a conceptual topic about Flink, rather than a technical.

In our case, we do have two Kafka topics A and B, that need to be joined. The join should always include all elements from topic A, as well as all new elements from topic B. There's 2 possibilities to achieve this: always create a new consumer and start consumption of topic A from beginning, or keep all elements from topic A within a state, once consumed. Right now, the technological approach is going via joining two DataStreams, which quickly shows us its limits for this use case, as there is no possibility to join streams without a window (fair enough). Elements from topic A are eventually lost, if the window moves on and I got the feeling regularly resetting the consumer would bypass the elaborate logic introduced by Flink.

The other approach I am looking towards right now, would be to use the Table API, it sounds like it's the best fit for this job and actually keeps all the elements in its state for an indefinite amount of time.

However my question: Before going into depths of the Table API, only to notice there is a more elegant way, I'd like to identify, if this is the optimal solution for this matter or if there's an even better fitting Flink concept I am not aware of?

Edit: I forgot to mention: We do not make use of POJOs, but rather keep it generic, which means that the incoming data is identified as Tuple2<K,V>, where K,V are each an instance of GenericRecord. The corresponding schema for Serialization/Deserialization is obtained from the Schema Registry on runtime. I don't know, to which extent the SQL constructs can be a bottleneck in this situation. Additionally, this remark from the documentation Both tables must have distinct field names makes me doubt a little bit, as we do have the same field names, which we will have to handle somehow, without having huge workarounds.

1

1 Answers

1
votes

If A is truly static, then it will be less expensive if you can somehow fully ingest A, either into Flink state or into memory, and then stream B past A -- thereby producing the join results without having to store B.

There are at least a couple of ways to accomplish this with Flink. One is described in this answer, and the other involves using the State Processor API.

With this second approach you would hold A in key-partitioned Flink state. By using the State Processor API you can bootstrap a savepoint that contains the state you want, so that by starting your job from this savepoint, A is already fully loaded and immediately available.

There's a simple example of bootstrapping keyed state in this gist. Once you have created the savepoint, then you need to implement a streaming job that uses it to compute the join -- which can be done with a RichFlatMapFunction.

The other alternative for implementing joins without using the Table API is to simply roll your own with a RichCoFlatMapFunction or a KeyedCoProcessFunction. You will find examples of this in the Flink training. None of those examples really match your requirements, but they give the general flavor. I don't see any advantage to this, however -- if you are going to do a fully dynamic/dynamic join, might as well use the Table API.