Just to clarify my comment I will post a sketch of what I was trying to suggest based on The Broadcast State Pattern. The link provides an example in Java, so I will follow it. In case you want in Scala it should not be too much different. You will likely have to implement the below code as it is explained on the link that I mentioned:
DataStream<String> output = colorPartitionedStream
.connect(ruleBroadcastStream)
.process(
// type arguments in our KeyedBroadcastProcessFunction represent:
// 1. the key of the keyed stream
// 2. the type of elements in the non-broadcast side
// 3. the type of elements in the broadcast side
// 4. the type of the result, here a string
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
// my matching logic
}
);
I was suggesting that you can collect the stream ruleBroadcastStream in fixed intervals from the database or whatever is your store. Instead of getting:
// broadcast the rules and create the broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
.broadcast(ruleStateDescriptor);
like the web page says. You will need to add a source where you can schedule it to run every X minutes.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
BroadcastStream<Rule> ruleBroadcastStream = env
.addSource(new YourStreamSource())
.broadcast(ruleStateDescriptor);
public class YourStreamSource extends RichSourceFunction<YourType> {
private volatile boolean running = true;
@Override
public void run(SourceContext<YourType> ctx) throws Exception {
while (running) {
// TODO: yourData = FETCH DATA;
ctx.collect(yourData);
Thread.sleep("sleep for X minutes");
}
}
@Override
public void cancel() {
this.running = false;
}
}
ruleStateDescriptorto renew the state every X minutes. - Felipe