0
votes

I need to make an AsyncIO rich function perform calls based on the latest set of rules. For operations like map I was able to process a BroadcastConnectedStream with a rich function by following this blog post: https://flink.apache.org/2019/06/26/broadcast-state.html

However, creating an AsyncIO function requires a DataStream as an input, which the BroadcastConnectedStream isn't (https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.html)

Does anyone have any ideas on how I could work around this limitation? The scenario is that I want an async function to stash incoming messages in the state when there's a non-transient error with an async call to the outside world, and resume operation once a "go-ahead" message is received on kafka (which I though I could do with a broadcast stream)

2

2 Answers

1
votes

I think it should be possible to put a BroadcastProcessFunction (not a keyed one) in front of an async i/o operator, but you'd have to union in the other stream(s) you are processing, since async i/o only has a single input. Given how ugly that is, finding some other way to communicate the "go ahead" signal might be preferable.

Or you might want to look at Stateful Functions, which has more flexibility in this area.

1
votes

So, first of all, AsyncFunction does not support the Keyed state, so You will have to work around that too and implement this by Yourself by CheckpointedFunction.

Generally, I don't think that there is anything out-of-the-box You can use for this case. The best idea I can think of if You want to use broadcast would be to use the KeyedBroadcastProcessFunction to emit results down the stream and then use the AsyncIO function. If You implement Your own state handling then You can keep all the results that have failed and simply retry them.

However, just getting all requests as list and looping through them to retry is probably not the best idea as this can probably cause performance drawback (You set how many requests should be in progress at the same time, but this request would actually last a lot longer than others).