We Are running a 5 node flink cluster (1.6.3) over kubernetes, with a 5 partitions Kafka topic source. 5 jobs are reading from that topic (with different consumer group), each with parallelism = 5.
Each task manager is running with 10Gb of ram and the task manager heap size is limited to 2Gb. The ingestion load is rather small (100-200 msgs per second) and an avg message size is ~4-8kb. all jobs are running fine for a few hours. after a duration we suddenly see one or more jobs failing on:
ava.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:666)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.read(IOUtil.java:195)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
flink restarts the job, but it keeps failing on that exception. we've tried reducing the record poll as suggested here: Kafka Consumers throwing java.lang.OutOfMemoryError: Direct buffer memory We also tried increasing kafka heap size as suggested here: Flink + Kafka, java.lang.OutOfMemoryError when parallelism > 1, although i'm failing to understand how failing to allocate memory in the flink process has anything to do with the jvm memory of the kafka broker process, and i don't see anything to indicate oom in the broker logs.
What might be the cause of that failure? what else should we check?
Thanks!