2
votes

I am trying to use the StochasticOutlierSelection model of the Apache Flink ML package.

I cannot work out how to use it with Kafka as the data source, I understand it needs a DataSet rather than a DataStream, but I don't seem to be able to window my Kafka DataStream to become a DataSet.

Is there a way I can treat my stream as a series of small DataSets. For instance, is there a way to say every 10 elements in the stream that match a pattern (sliding window by elements unique ID) treat them as a fixed size DataSet and detect any outliers within this fixed size dataset?

The scenario I am looking to create is:

data source -> Kafka Topic 1 -> Flink pre-processing -> Kafka Topic 2 -> Flink Groups By ID -> Outlier Detection on the groups

I already have a working implementation up to pre-processing, and am hoping Flink will be able to meet my requirements ?

1

1 Answers

1
votes

I guess you could create a count-based Global window and use the ExecutionEnvironment to get a DataSet. Something like the following might work (getResult would return a DataSet):


      stream.
      keyBy(...).
      window(GlobalWindows.create).
      trigger(CountTrigger.of(10)).
      aggregate(new MyAggregator()).
      ...

    class MyAggregator extends AggregateFunction[..., ..., ...] {  

      var valueList: List[LabeledVector] = List[LabeledVector]()    

      override def createAccumulator(): MyAggregator = new MyAggregator()
      override def add(value: .., accumulator: MyAggregator): ... = ...
      override def merge(agg1: MyAggregator, agg2: MyAggregator): ... = ...
      override def getResult(accumulator: MyAggregator): ... = {
        ExecutionEnvironment.getExecutionEnvironment.fromCollection(valueList)
      }
    }