1
votes

I am trying to figure out the best practices to deal with poison messages / unhandled exceptions with Apache Flink. We have a Job doing real time event processing of location data from IoT devices. There are two potential scenarios where this can arise:

  1. Data is bad in some way - e.g. invalid value
  2. Data triggers a bug due to some edge case we have not anticipated.

Currently, all my data processing stops because of just one message.

I've seen two suggestions:

  1. Catch the exceptions - this requires me wrapping every piece of logic with something to catch every runtime exception
  2. Use side outputs as a kind of DLQ - from what I can tell this seems to be a variation on #1 where I have to catch all the exceptions and send them to the side output.

Is there really no way to do this other than wrap every piece of logic with exception handling? Is there no generic way to catch exceptions and not have processing continue?

1
where are the messages coming from? Kafka?sinanspd
Why does it matter? Sometimes Rabbit MQ, sometimes MQTT, sometimes via websockets. Rabbit MQ is the only out of the box source we use.Dan Diephouse

1 Answers

4
votes

I think the idea is not to catch all kinds of exceptions and send them elsewhere, but rather to have well-tested and functioning code and use dead letters only for invalid inputs.

So a typical pipeline would be

source => validate => ... => sink
                  \=> dead letter queue

As soon as your record passes your validate operator, you want all errors to bubble up, as any error in these operators may result in corrupted aggregates and data that - once written - cannot be reverted easily.

The validate step would work with any of the two approaches that you outlined. Typically, side-outputs have better semantics, but you may end up with more code.


Now you may have a service with high SLAs and actually want it to produce output even if it is corrupted just to produce data. Or you have simple transformation pipeline, where you'd miss some events but keep the majority (and downstream can deal with incomplete data). Then you are right that you need to wrap the code of all operators with try-catch. However, you'd typically still would only do it for the fragile operators and not for all of them. Trivial operators should be tested and then trusted to work. Further, you'd usually only catch specific kinds of exceptions to limit the scope to the kind of expected exceptions that can happen.

You might wonder why Flink doesn't have it incorporated as a default pattern. There are two reasons as far as I can see:

  1. If Flink silently ignores any kind of exception and sends an extra message to a secondary sink, how can Flink ensure that the throwing operator is in a sane state afterwards? How can it avoid any kind of leaks that may happen because cleanup code is not executed?
  2. It's more common in Java to let the developers explicitly reason about exceptions and exception handling. It's also not straight-forward to see what the requirements are: Do you want to have the input only? Do you also want to store the exception? What about the operator state that may have influenced the outcome? Should Flink still fail when too many errors have been received in a given time window? It quickly becomes a huge feature for something that should not happen at all in an ideal world where high quality data is ingested and properly processed.

So while it looks easy for your case because you exactly know which kinds of information you want to store, it's not easy to have a solution for all purposes, especially since the extra code that a user has to write is tiny compared to the generic solution.


What you could do is to extract most of the complicated logic things into a single ProcessFunction and use side-outputs as you have outlined. Since it's a central piece, you'd only need to write the side-output function once. If it's done multiple times, you could extract a helper function where you pass your actual code as a RunnableWithException lambda which hides all the side-output logic. Make sure you use plenty of finally blocks to ensure a sane state.

I'd also add quite a few IT cases and use mutation testing to harden your pipeline quicker. If you keep your test data inline, the mutants may also exactly simulate your unexpected data issues, such that your validate operator gets more complete.