Currently, we're trying to figure out, how to work with Flink efficiently and we're still trying to make sense of everything.
We are running about 60 really light-weight jobs on a standalone cluster, which works fine on an average EC2 instance. However, once I enabled checkpointing with a local RocksDB state backend, the cluster behaves in an unexpected manner, stops jobs, tries to restart them only to discard all of them and leaving the error logs empty. After that, no trace of either jobs or jars is left in Flink.
I am aware that for each job, a fraction of the total JobManager memory is reserved, and likewise, for each job a local RocksDB is instantiated on the same machine, but I assumed they would be equally lightweight and not require much memory/CPU capacity. Just adding the line env.enableCheckpointing(1000); lead to a total failure of everything as opposed to a stable cluster before.
I personally think we may have reached the limit of our standalone Flink cluster, even increasing the memory would not suffice anymore but I'd like to have confirmation on that, before we start building a distributed Flink cluster (we'd need to automate everything, that's why I'm hesitant right now). I am not sure if e.g. storing the RocksDB checkpoints in some dedicated storage unit like S3 would even tackle this problem and if the resource consumption (other than hard disk) would be affected at all.
Is moving to a distributed environment the only way to solve our problem or does this indicate some other problem, which could be fixed by proper configuration?
Edit: Maybe I should add that there is no load yet, we are not yet talking about incoming data, but about the jobs remain running. There's a mere 100 records in the FlinkSources right now, but we won't even reach the point of those being processed.
Edit2:
This part was always part of the jobs' code:
try {
env.setStateBackend((StateBackend) new RocksDBStateBackend("file://" + "/somePathOnInstance"));
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
and we added following line of code:
env.enableCheckpointing(CHECKPOINTING_INTERVAL_MS);
The typecast to StateBackend should not be necessary, as 1.9.1 version of RocksDBStateBackend class should already be implementing StateBackend instead of AbstractStateBackend according to the documentation. However, the doc does not resemble the actual class we get from Maven, so there's that.