It may happen that data that enters Flink job triggers exception either due to bug in code or lack of validation. My goal is to provide consistent way of exception handling that our team could use within Flink jobs that won't cause any downtime in production.
Restart strategies do not seem to be applicable here as:
- simple restart won't fix issue and we fall into restart loop
- we cannot simply skip event
- they can be good for OOME or some transient issues
- we cannot add custom one
try/catch block in "keyBy" function does not fully help as:
- there's no way to skip event in "keyBy" after exception is handled
Sample code:
env.addSource(kafkaConsumer)
.keyBy(keySelector) // must return one result for one entry
.flatMap(mapFunction) // we can skip some entries here in case of errors
.addSink(new PrintSinkFunction<>());
env.execute("Flink Application");
I'd like to have ability to skip processing of event that caused issue in "keyBy" and similar methods that are supposed to return exactly one result.