0
votes

I am new to Flink i am doing a pattern matching using apache flink where the list of patterns are present in broadcast state and iterating through the patterns in processElements function to find the pattern matched and i am reading this patterns from a database and its a on time activity. Below is my code

MapState Descriptor and Side output stream as below

public static final MapStateDescriptor<String, String> ruleDescriptor=
        new MapStateDescriptor<String, String>("RuleSet", BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO);

public final static OutputTag<Tuple2<String, String>> unMatchedSideOutput =
        new OutputTag<Tuple2<String, String>>(
                "unmatched-side-output") {
        };

Process Function and Broadcast Function as below:

@Override
public void processElement(Tuple2<String, String> inputValue, ReadOnlyContext ctx,Collector<Tuple2<String,String>> out) throws Exception {

for (Map.Entry<String, String> ruleSet:                ctx.getBroadcastState(broadcast.patternRuleDescriptor).immutableEntries()) {

String ruleName = ruleSet.getKey();


//If the rule in ruleset is matched then send output to main stream and break the program
if (this.rule) {
out.collect(new Tuple2<>(inputValue.f0, inputValue.f1));
break;
}
}

// Writing output to sideout if no rule is matched 
ctx.output(Output.unMatchedSideOutput, new Tuple2<>("No Rule Detected", inputValue.f1));
}

@Override
public void processBroadcastElement(Tuple2<String, String> ruleSetConditions, Context ctx, Collector<Tuple2<String,String>> out) throws Exception {            ctx.getBroadcastState(broadcast.ruleDescriptor).put(ruleSetConditions.f0,
                    ruleSetConditions.f1);
}

Main Function as below

   public static void main(String[] args) throws Exception {

        //Initiate a datastream environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //Reads incoming data for upstream
        DataStream<String> incomingSignal =
                env.readTextFile(....);

        //Reads the patterns available in configuration file
        DataStream<String> ruleStream =
                env.readTextFile();


        //Generate a key,value pair of set of patterns where key is pattern name and value is pattern condition
        DataStream<Tuple2<String, String>> ruleStream =
                rawPatternStream.flatMap(new FlatMapFunction<String, Tuple2<String, String>>() {
            @Override
            public void flatMap(String ruleCondition, Collector<Tuple2<String, String>> out) throws Exception {

                    String rules[] = ruleCondition.split[","];
                    out.collect(new Tuple2<>(rules[0], rules[1]));
                }
            }
        });

        //Broadcast the patterns to all the flink operators which will be stored in flink operator memory
        BroadcastStream<Tuple2<String, String>>ruleBroadcast = ruleStream.broadcast(ruleDescriptor);

        /*Creating keystream based on sourceName as key */
        DataStream<Tuple2<String, String>> matchSignal =
                incomingSignal.map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String incomingSignal) throws Exception {
                        String sourceName = ingressSignal.split[","][0]

                        return new Tuple2<>(sourceName, incomingSignal);
                    }
                }).keyBy(0).connect(ruleBroadcast).process(new KeyedBroadCastProcessFunction());


        matchSignal.print("RuleDetected=>");
}

I have a couple of questions

1) Currently i am reading rules from a database, how can i update the broadcast state when flink job is running in cluster and if i get new set of rules from a kafka topic how can i update the broadcast state in processBroadcast method in KeyedBroadcasrProcessFunction 2)When the broadcast state is updated do we need to restart the flink job?

Please help me with above questions

1
Hey, please add some code example to allow people to better understand the issue. - Dominik Wosiński
@DominikWosiński I updated the question with my code can you please suggest how can i update broadcast state if any changes in rules - YRK

1 Answers

0
votes

The only way to either set or update broadcast state is in the processBroadcastElement method of a BroadcastProcessFunction or KeyedBroadcastProcessFunction. All you need to do is to adapt your application to stream in the rules from a streaming source, rather than reading them once from a file.

Broadcast state is a hash map. If your broadcast stream includes a new key/value pair that uses the same key as an earlier broadcast event, then the new value will replace the old one. Otherwise you'll end up with an entirely new entry.

If you use readFile with FileProcessingMode.PROCESS_CONTINUOUSLY, then every time you modify the file its entire contents will be reingested. You could use that mechanism to update your set of rules.