5
votes

I've 3 keyed data streams of different types.

DataStream<A> first;
DataStream<B> second;
DataStream<C> third;

Each stream has its own processing logic defined and share a state between them. I want to connect these 3 streams triggering the respective processing functions whenever data is available in any stream. Connect on two streams is possible.

first.connect(second).process(<CoProcessFunction>)

I can't use union (allows multiple data stream) as the types are different. I want to avoid creating a wrapper and convert all the streams into the same type.

2

2 Answers

4
votes

The wrapper approach isn't too bad, really. You can create an EitherOfThree<T1, T2, T3> wrapper class that's similar to Flink's existing Either<Left, Right>, and then process a stream of those records in a single function. Something like:

    DataStream <EitherOfThree<A,B,C>> combo = first.map(r -> new EitherOfThree<A,B,C>(r, null, null))
        .union(second.map(r -> new EitherOfThree<A,B,C>(null, r, null)))
        .union(third.map(r -> new EitherOfThree<A,B,C>(null, null, r)));
    combo.process(new MyProcessFunction());

Flink's Either class has a more elegant implementation, but for your use case something simple should work.

2
votes

Other than union, the standard approach is to use connect in a cascade, e.g.,

first.connect(second).process(...).connect(third).process(...)

You won't be able to share state between all three streams in one place. You can have the first process function output whatever the subsequent process function will need, but the third stream won't be able to affect the state in the first process function, which is a problem for some use cases.

Another possibility might be to leverage a lower-level mechanism -- see FLIP-92: Add N-Ary Stream Operator in Flink. However, this mechanism is intended for internal use (the Table/SQL API uses this for n-way joins), and would need to be treated with caution. See the mailing list discussion for details. I mention this for completeness, but I'm skeptical this is a good idea until the interface is further developed.

You might also want to look at the stateful functions api, which overcomes many of the restrictions of the datastream api.