I'm learning how snapshot mechanism works in Flink.
As my understanding, JobManager will insert the barriers into each Data Source with a fixed interval, and each operator will do a snapshot once it receive nth barriers from all of its data sources.
If I'm right, it seems that this mechanism may use more and more memories in some cases.
Here is an example:
Saying that there are two Data Sources: Source 1
and Source 2
, and one Operator.
Source 1 -----\
------ Operator
Source 2 -----/
Source 1
is generating the integer stream: 1, 2, 3, 4, 5...
Source 2
is generating the character stream: a, b, c, d, e...
The Operator does this: it takes two inputs from Source 1
and one input from Source 2
to generate an output: 1a2, 3b4, 5c6, 7d8...
Let's say JobManager inserts the barriers to the two Data Sources as below:
1, BARRIER A, 2, BARRIER B, 3, BARRIER C, 4, BARRIER D, 5...
a, BARRIER A, b, BARRIER B, c, BARRIER C, d, BARRIER D, 5...
Now let's begin.
When the two "BARRIER A" of Source 1
and of Source 2
enter into the Operator, Flink will make a snapshot for the Operator, whose current state is 1
and a
, because 1
and a
have been in the Operator when the BARRIER A entered into the Operator.
Then, when the two "BARRIER B" enter into the Operator, the Operator has finished its first task: generate 1a2
, and Flink will make another snapshot: NA
, b
. NA
means currently there is no new input from Source 1
.
At the same time, each snapshot will be stored into RAM, FS or RocksDB (depending on how we configures Flink).
If I'm right, I think Flink will generate more and more snapshots in this example. Because the speed of consumption of Source 1
is always two times than that of Source 2
.
Did I misunderstand something?