2
votes

We're trying to build a use case where data from a stream is run through a calculation formula, but the formula itself should also (rarely) be updateable. From reading the documentation, it seems to me that Flink broadcast state would be a natural fit for a case like this.

As an experiment, I've built a simplified version: suppose I have a stream of integers, and a second stream containing multiplication factors for those integers (where I can send values at will). The second stream is very low frequency, could easily be in the order of days or weeks between events. For now these both are implemented as simple socket servers, the end product would use Kafka.

In my example application this all works, but I'm left with one problem: what happens when the system starts and nothing has happened on the broadcasted stream yet? Where could I get the default (or last used) factor from? In my example I've solved it by hard coding a value for now, but that's not something I could use.

In my experimental project I'm a bit stumped by this, as {processElement} only gets a read-only broadcast state, but processBroadcastElement won't be called until there's an update which could take a long time. My plan was to store the formulae used in a database and somehow read it in when the job (re)starts but I haven't found a way to make this work. Any suggestions from more knowledgeable people would be welcome, this is my first Flink project so I'm trying to find my way around.

The working example is here: https://github.com/tonvanbart/flink-broadcast-example/tree/mapstate-attempt The Flink code is in class BroadcastState.

Thanks in advance.

1

1 Answers

0
votes

If the system is restarting from a checkpoint/savepoint, then you have the last factor that was broadcast (via state), right? So I assume the issue is what to do when it initially starts up.

If so, then this is a common problem with the pattern you're using, where you effectively want to block the stream of integers until you've gotten the initial value from the broadcast stream.

Right now the common solution is to buffer the integer stream in your operator (using state) until you get that initial value, but this can result in unbounded state depending on how fast integers are coming in, and how long you have to wait.

Something else you can try is to wrap your integer source (make it a delegate) and don't emit any values until you know that something has been broadcast. E.g. make what's broadcast into queryable state, and do a periodic check until the state exists.