I'm learning/experimenting with Flink, and I'm observing some unexpected behavior with the DataStream join, and would like to understand what is happening...
Let's say I have two streams with 10 records each, which I want to join on a id
field. Let's assume that for each record in one stream had a matching one in the other, and the IDs are unique in each stream. Let's also say I have to use a global window (requirement).
Join using DataStream API (my simplified code in Scala):
val stream1 = ... // from a Kafka topic on my local machine (I tried with and without .keyBy)
val stream2 = ...
stream1
.join(stream2)
.where(_.id).equalTo(_.id)
.window(GlobalWindows.create()) // assume this is a requirement
.trigger(CountTrigger.of(1))
.apply {
(row1, row2) => // ...
}
.print()
Result:
- Everything is printed as expected, each record from the first stream joined with a record from the second one.
However:
- If I re-send one of the records (say, with an updated field) from one of the stream to that stream, two duplicate join events get emitted ????
- If I repeat that operation (with or without updated field), I will get 3 emitted events, then 4, 5, etc... ????
Could someone in the Flink community explain why this is happening? I would have expected only 1 event emitted each time. Is it possible to achieve this with a global window?
In comparison, the Flink Table API behaves as expected in that same scenario, but for my project I'm more interested in the DataStream API.
Example with Table API, which worked as expected:
tableEnv
.sqlQuery(
"""
|SELECT *
| FROM stream1
| JOIN stream2
| ON stream1.id = stream2.id
""".stripMargin)
.toRetractStream[Row]
.filter(_._1) // just keep the inserts
.map(...)
.print() // works as expected, after re-sending updated records
Thank you,
Nicolas