I am a newbie with apache flink. I have an unbound data stream in my input (fed into flink 0.10 via kakfa).
I want to get the 1st occurence of each primary key (the primary key is the contract_num and the event_dt).
These "duplicates" occur nearly immediately after each other.
The source system cannot filter this for me, so flink has to do it.
Here is my input data:
contract_num, event_dt, attr
A1, 2016-02-24 10:25:08, X
A1, 2016-02-24 10:25:08, Y
A1, 2016-02-24 10:25:09, Z
A2, 2016-02-24 10:25:10, C
Here is the output data I want:
A1, 2016-02-24 10:25:08, X
A1, 2016-02-24 10:25:09, Z
A2, 2016-02-24 10:25:10, C
Note the 2nd row has been removed as the key combination of A001 and '2016-02-24 10:25:08' already occurred in the 1st row.
How can I do this with flink 0.10?
I was thinking about using keyBy(0,1)
but after that I don't know what to do!
(I used joda-time and org.flinkspector to setup these tests).
@Test
public void test() {
DateTime threeSecondsAgo = (new DateTime()).minusSeconds(3);
DateTime twoSecondsAgo = (new DateTime()).minusSeconds(2);
DateTime oneSecondsAgo = (new DateTime()).minusSeconds(2);
DataStream<Tuple3<String, Date, String>> testStream =
createTimedTestStreamWith(
Tuple3.of("A1", threeSecondsAgo.toDate(), "X"))
.emit(Tuple3.of("A1", threeSecondsAgo.toDate(), "Y"), after(0, TimeUnit.NANOSECONDS))
.emit(Tuple3.of("A1", twoSecondsAgo.toDate(), "Z"), after(0, TimeUnit.NANOSECONDS))
.emit(Tuple3.of("A2", oneSecondsAgo.toDate(), "C"), after(0, TimeUnit.NANOSECONDS))
.close();
testStream.keyBy(0,1);
}