1
votes

I am new to Flink i am doing of pattern evaluation using Flink KeyedBroadCastProcessFunction some thing in similar lines to (https://flink.apache.org/2019/06/26/broadcast-state.html) and i am using JAVA to develop my code but i am not getting how can i handle exception if any failure happens while processing datastream i searched a lot but didn't get i ended up in below two links

Flink: what's the best way to handle exceptions inside Flink jobs

Apache Flink - exception handling in "keyBy"

As per the first link the user said he is using sideoutput in processfn to capture errors i am also using sideoutput in my program to send the data which does not match with patterns but i didn't get how to handle error and invalid data to same side output

As per the second link the user is trying add a sink to keyby function with null key and printsink function which i didn't understand at all

Can any one please help me with below things

1)Any documentation or small code snippet for exception handling i didn't find any thing in flink documentation site 2)Best practices for flink exception handling

I didn't find any valid resources online if some one can answer it will be useful for further references to others also

1

1 Answers

1
votes

You can have as many side outputs from a ProcessFunction as you like -- each will have its own unique OutputTag. So you can use one for unmatched data, and another for errors.

final OutputTag<T> unmatched = new OutputTag<String>("unmatched-data"){};
final OutputTag<String> errors = new OutputTag<String>("side-output-for-errors"){};

SingleOutputStreamOperator<T> matchedData = ...;

DataStream<T> unmatched = matchedData.getSideOutput(unmatched);
DataStream<String> errors = matchedData.getSideOutput(errors);

If you end up with several different operators each using side outputs to collect errors, then you could union them all together for reporting, which might look something like this:

final OutputTag<String> errors = new OutputTag<String>("side-output"){};

SingleOutputStreamOperator<T> task1 = ...;
SingleOutputStreamOperator<T> task2 = ...;
SingleOutputStreamOperator<T> task3 = ...;

DataStream<String> exceptions1 = task1.getSideOutput(errors);
DataStream<String> exceptions2 = task2.getSideOutput(errors);
DataStream<String> exceptions3 = task3.getSideOutput(errors);

DataStream<String> exceptions = exceptions1.union(exceptions2, exceptions3);

exceptions.addSink(new FlinkKafkaProducer(...));