0
votes

I am defining certain variables in one java class and i am accessing it with a different class so as to filter the stream for unique elements. Please refer code to understand the issue better.

The problem i am facing is this Filter function doesn't work well and fails to filter unique events. I doubt the variable is shared among different threads and it is the cause!? Please suggest another method if this is not the correct way to do it. Thanks in advance.

**ClassWithVariables.java**
public static HashMap<String, ArrayList<String>> uniqueMap = new HashMap<>();


**FilterClass.java**
public boolean filter(String val) throws Exception {

       if(ClassWithVariables.uniqueMap.containsKey(key)) {

                Arraylist<String> al = uniqueMap.get(key);

                if(al.contains(val) {
                    return false;
                } else {
                    //Update the hashmap list(uniqueMap)                    
                    return true;    
                }


       } else {

               //Add to hashmap list(uniqueMap)
               return true;
       }

}
1

1 Answers

2
votes

The correct way to de-duplicate a stream involves partitioning the stream by the key, so that all elements containing the same key will be processed by the same worker, and using flink's managed, keyed state mechanism so that the state is fault-tolerant and re-scalable. Here's a sample implementation:

public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  env.addSource(new EventSource())
    .keyBy(e -> e.key)
    .flatMap(new Deduplicate())
    .print();

  env.execute();
}

public static class Deduplicate extends RichFlatMapFunction<Event, Event> {
  ValueState<Boolean> seen;

  @Override
  public void open(Configuration conf) {
    ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
    seen = getRuntimeContext().getState(desc);
  }

  @Override
  public void flatMap(Event event, Collector<Event> out) throws Exception {
    if (seen.value() == null) {
      out.collect(event);
      seen.update(true);
    }
  }
}

This could also be implemented as a RichFilterFunction, btw. But note that if you have an unbounded key space, the state being used will grow indefinitely until you run out of heap, or space on the disk, depending on which of Flink's state backends you choose. If this is an issue, you might want to set up a state retention policy via State Time-to-Live.

Note also that sharing state between different parts of a Flink pipeline isn't possible. You need to turn things inside-out compared to what might seem normal, and bring the event stream to the state, rather than fetching it.