1
votes

I have been reading Flink docs and I needed few clarification. Hopefully someone can help me out here.

State Backend - This basically refers to the location where the data for my operations will be stored, for example if I'm doing an aggregation on a 2 hr window, where will this data buffered will be stored. As pointed out in the docs, for a large state we should use RocksDB.

The RocksDBStateBackend holds in-flight data in a RocksDB database that is (per default) stored in the TaskManager data directories

Does in-flight data here refers to the incoming data say from kafka stream that has not been yet checkpointed ?

Upon checkpointing, the whole RocksDB database will be checkpointed into the configured file system and directory. Minimal metadata is stored in the JobManager’s memory

When using RocksDb when a checkpoint is created, the entire buffered data is stored in stored in disk. Then say when the window is to be triggered at end of 2 hr, this state which was stored in disk will be de-serialised and used for the operation ?

Note that the amount of state that you can keep is only limited by the amount of disk space available

Does this mean that I could run an analytical query on potentially high throughout stream with very limited resources. Suppose my Kafka Stream has a rate of 50k messages /sec, then I could run it on a single core on my EMR cluster and the tradeoff will be that Flink won't be able to catch up with the incoming rate and have a lag but given enough disk space it won't have a OOM error ?

When a checkpoint is completed, I assume that the completed aggregated checkpoint metadata (like the HDFS or S3 path from each TM) from all the TM will be sent to the JM ?. In case of TM failure, the JM will spin up a new JM and restore the state from the last checkpoint.

The default setting for JM in flink-conf.yaml - jobmanager.heap.size: 1024m.
My confusion here is why does JM needs 1Gb of heap memory. What all does a JM handles apart from synchronisation among TMs. How do I actually decide how much of memory should be configured for JM on production.

Can someone verify that my understanding is correct or not and point me in the correct direction. Thanks in advance!

1

1 Answers

0
votes

Overall your understanding appears to be correct. One point: in the case of a TM failure, the JM will spin up a new TM and restore the state from the last checkpoint (rather than spinning up a new JM).

But to be a bit more precise, in the last few releases of Flink, what used to be a monolithic job manager has been refactored into separate components: a dispatcher that receives jobs from clients and starts new job managers as needed; a job manager that only is concerned with providing services to a single job; and a resource manager that starts up new TMs as needed. The resource manager is the only component that is cluster framework specific -- e.g., there is a YARN resource manager.

The job manager has other roles as well -- it is the checkpoint coordinator and the API endpoint for the web UI and metrics.

How much heap the JM needs is somewhat variable. The defaults were chosen to try to cover more than a narrow set of situations, and to work out of the box. Also, by default, checkpoints go to the JM heap, so it needs some space for that. If you have a small cluster and are checkpointing to a distributed filesystem, you should be able to get by with less than 1GB.