0
votes

I read data from four Kinesis streams. The data in each stream is a different data type. After reading all four streams in, I assign timestamps and watermarks, and aggregate the data from each stream. The results of the four aggregations are all output using the same generic object. I want to union the results from the four streams so I can send the unioned stream to one ProcessFunction. This essentially would allow me to use the ProcessFunction like a CoProcessFunction, but I would be able to deal with data from more than two streams (in this case the ProcessFunction would receive the aggregations from all four individual streams).

However, my concern is that this might not play well with watermarks. If one stream is taking longer to process or is somehow behind, its aggregation might not make it to the process function if all the watermarks are passed forward in the union and one of the streams is ahead of the others. If that is the case, then the process function's watermark would be the max of the watermarks it sees from the four individual streams.

My question is this: How do watermarks get handled in union operators and how do operators downstream of the union process those watermarks?

Additionally: If the union of generic objects does not work due to watermark issues, what is the best way to combine the results of four different aggregations when Flink only supports a CoProcessFunction for two streams?

2

2 Answers

1
votes

The other way to connect together more than 2 streams is to build up a tree that does pairwise connections until all of the streams have been joined together. Either as a balanced tree, like this:

A--->
     A+B---->
B--->

            A+B+C+D------------>

C--->
     C+D---->
D--->

or by adding in one stream at a time, like this:

a--->
     a+b--->
b--->
            a+b+c--->
     c----->
                     a+b+c+d--->
            d------->

FWIW, FLIP-92 is a proposal to add an n-ary stream operator to Flink, but even if implemented, it probably won't be user visible, at least at first.

1
votes

The watermark with Union works just as the watermark with parallel streams. This means that the watermark is always the min of watermarks from all input streams. The same stands for downstream operators, they watermark will be the min of all input streams.

To be honest I don't think that union depends on the watermarks in any way. But if You for any reason want to use the CoProcessFunction I can offer this somewhat hacky way. You can create a Seq of streams that You have generated and then :

//Streams defined
val seq = Seq(stream, stream2, stream3, stream4)
seq.reduce((stream1, stream2) => stream1.connect(stream2).process(...))