0
votes

Currently, we're trying to figure out, how to work with Flink efficiently and we're still trying to make sense of everything.

We are running about 60 really light-weight jobs on a standalone cluster, which works fine on an average EC2 instance. However, once I enabled checkpointing with a local RocksDB state backend, the cluster behaves in an unexpected manner, stops jobs, tries to restart them only to discard all of them and leaving the error logs empty. After that, no trace of either jobs or jars is left in Flink. I am aware that for each job, a fraction of the total JobManager memory is reserved, and likewise, for each job a local RocksDB is instantiated on the same machine, but I assumed they would be equally lightweight and not require much memory/CPU capacity. Just adding the line env.enableCheckpointing(1000); lead to a total failure of everything as opposed to a stable cluster before.

I personally think we may have reached the limit of our standalone Flink cluster, even increasing the memory would not suffice anymore but I'd like to have confirmation on that, before we start building a distributed Flink cluster (we'd need to automate everything, that's why I'm hesitant right now). I am not sure if e.g. storing the RocksDB checkpoints in some dedicated storage unit like S3 would even tackle this problem and if the resource consumption (other than hard disk) would be affected at all.

Is moving to a distributed environment the only way to solve our problem or does this indicate some other problem, which could be fixed by proper configuration?

Edit: Maybe I should add that there is no load yet, we are not yet talking about incoming data, but about the jobs remain running. There's a mere 100 records in the FlinkSources right now, but we won't even reach the point of those being processed.

Edit2:

This part was always part of the jobs' code:

try {
    env.setStateBackend((StateBackend) new RocksDBStateBackend("file://" + "/somePathOnInstance"));
    } catch (IOException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    }

and we added following line of code:

env.enableCheckpointing(CHECKPOINTING_INTERVAL_MS);

The typecast to StateBackend should not be necessary, as 1.9.1 version of RocksDBStateBackend class should already be implementing StateBackend instead of AbstractStateBackend according to the documentation. However, the doc does not resemble the actual class we get from Maven, so there's that.

1
Did you simultaneously enable checkpointing and switch to using RocksDB, or were you successfully using RocksDB before, without checkpointing? - David Anderson
@DavidAnderson We already enabled RocksDB before, I added details to that in the initial post, please check the most recent edit. However, without making use of checkpointing, the RocksDB is presumably just used to capture the state, which in our case is really small (partially just a boolean). I can only assume it's way less resource-intensive than using actual checkpointing? - kopaka
Turning on checkpointing does increase the resource requirements somewhat. Are you really running 60 jobs at once on a single EC2 instance? And you turned on checkpointing for all of them at once? - David Anderson
@DavidAnderson Judging by that question, I assume it's rather unusual? The answer is yes, 60 jobs (mainly 1 source, 1 sink and one simple function in between), which actually was no big problem on a t3. medium instance without checkpointing. However, there was no heavy load, at max just a few thousand records but it worked like this. - kopaka
I don't know if it would be an improvement, but I'll just point out that it is possible to run multiple pipelines in the same job. Using this technique to reduce the total number of jobs should reduce the overall resource requirements. - David Anderson

1 Answers

1
votes

Given that the 60 jobs you are running have rather trivial workloads, then it is reasonable to think that turning on checkpointing is having a significant impact. Basically, I suspect that having 60-180 new threads (I'm not sure which of your operators are stateful) all trying to frequently write to the filesystem is overwhelming your t3.medium instance.

Checkpointing is managed by the checkpoint coordinator (in the Flink master), which communicates with all of the jobs, initiating the checkpoints, waiting for them to complete, and managing the metadata. Most of the work involved is done by the task managers, and is done asynchronously, so in your case that's a lot of new threads, each of which is copying the data being checkpointed to your checkpoint store (which should be a distributed filesystem, like S3). If the checkpointing interval is short, e.g., one second, then this is all happening every second.

You could inspect various metrics to try to figure out where the bottleneck is -- you may be limited by memory, or CPU, or network bandwidth.

With RocksDB, incremental checkpointing is generally lighter weight than doing full checkpoints, so selecting that option can help -- though with such small amounts of state, I don't expect this to help. More importantly, for performance reasons you should avoid using EBS (or any other network-attached storage) as the local disk for RocksDB.