1
votes

My Flink job is frequently going OOM with one or the other task manager. I have enough memory and storage for my job (2 JobManagers/16 TaskManagers - each with 15core and 63GB RAM). Sometimes the job runs 4 days before throwing OOM, sometimes job goes into OOM in 2 days. But the traffic is consistent compared to previous days.

I have a received a suggestion not to pass through objects in streaming pipeline and instead use primitives to reduce shuffling overhead and memory consumption.

The flink job I work is written in Java. Lets say below is my pipeline

Kafka source
    deserialize (converted bytes to java object, the object contains String, int, long types)

    FirstKeyedWindow    (the above serialized java objects received here)
        reduce

    SecondKeyedWindow (the above reduced java objects received here)
        reduce

Kafka sink (above java objects are serialized into bytes and are produced to kafka)

My question is what all should I consider to reduce the overhead and memory consumption? Will replacing String with char array helps reduce overhead a bit? or Should I only deal with bytes all through the pipeline? If I serialize the object between the KeyedWindows, will it help reduce the overhead? but if I have to read the bytes back, then I need to deserialize, use as required and then serialize it. Wouldn't it create more overhead of serializing/deserializing?

Appreciate your suggestions. Headsup, I am talking about 10TB of data received per day.

Update 1:

The exception I see for OOM is as below:

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'host/host:port'. This might indicate that the remote task manager was lost.

Answering to David Anderson comments below: The Flink version used is v1.11 The state backend used is RocksDB, file system based. The job is running out of heap memory. Each message from Kafka source is sized up-to 300Bytes. The reduce function does deduplication (removes duplicates within the same group), the second reduce function does aggregation (updates the count within the object).

Update 2:

After thorough exploration, I found that Flink uses Kyro default serializer which is inefficient. I understood custom_serializers can help reduce overhead if we define one instead of using Kyro default. I am now trying out google-protobuf to see if it performs any better.

And, I am also looking forward to increase taskmanager.network.memory.fraction which suits to my job parallelism. Yet to find out the right calculation to set the above configuration.

1
It could depend on the window size. If you are using time window maybe at some window you are receiving more bytes that your TaskManager can handle. If you want to speedup the (de)serialization, it could be worthwhile to use Avro.Felipe
Please give us more info. Which state backend, and which version of Flink are you using? What error do you get, exactly -- are you running out of heap? And just to confirm: is this with the DataStream API? I doubt this relates to how you're doing ser/de, but how large are these objects?David Anderson
One more thing: the window reduce functions -- what sort of data structure are you reducing into?David Anderson
Updated my question with the your comments addressed. Thank you.Deepak

1 Answers

1
votes

I am answering my own question here after what I tried has worked for me. I have found extra metrics in Grafana that is tied to my Flink job. Two of the metrics are GC time and GC count. I have seen some good spikes in GC (Garbage Collection) metrics. The reason for that could possibly be is, I have some new object creations going in the job pipeline. And considering the TBs of data I am dealing with and 20 Billion records per day, this object creations went haywire. I have optimized it to reuse the objects as much as I can and that reduced the amount of memory consumption.

And I have increased the taskmanager.network.memory to the required value which is set to 1GB default.

In my question above, I talked about custom serializers to reduce network overhead. I tried implementing protobuf serializer with Kyro and the protobuf generated classes are final. If I have to update the objects, I have to create new objects which will create spikes in GC metrics. So, avoided using it. May be I can further change the protobuf generate classes to suit my needs. Will consider that step if things are inconsistent.