I'm currently trying to implement some algorithms in both, Apache Spark and Apache Flink. When executing the algorithms, I have to do some kind of set difference/subtraction operations.
While there is a built-in subtract operation for Apache Spark, I couldn't find something similar in Apache Flink (1.0.3 and 1.1.0-SNAPSHOT).
So my question is, given two DataSet objects d1, d2 both containing the same type T, what is the most efficient way to apply set difference, i.e. d1\d2?
val d1: DataSet[T] = ...
val d2: DataSet[T] = ...
val d_diff: DataSet[T] = ???
Probably there is some way to it via coGroup
val d_diff = d1.coGroup(d2).where(0).equalTo(0) {
(l, r, out: Collector[T]) => {
val rightElements = r.toSet
for (el <- l)
if (!rightElements.contains(el)) out.collect(el)
}
}
but I'm wondering whether that's the correct way or even best-practice or does anybody know some more efficient way?