1
votes

Since the last couple week I build a DataStream programs in Flink in scala. But I have a strange behavior, flink uses lots of more memory than I expected.

I have a 4 ListState of tuple(Int, long) in my processFunction keyed by INT, I use it to get different unique Counter in a different time frame, and I expected the most of the memory was used by this List.

But it's not the case. So I print an histo live of the JVM. And I was surprised how many memories are used.

 num     #instances         #bytes  class name
----------------------------------------------
   1:     138920685     6668192880  java.util.HashMap$Node
   2:     138893041     5555721640  org.apache.flink.streaming.api.operators.InternalTimer
   3:     149680624     3592334976  java.lang.Integer
   4:      48313229     3092046656  org.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry
   5:      14042723     2579684280  [Ljava.lang.Object;
   6:          4492     2047983264  [Ljava.util.HashMap$Node;
   7:      41686732     1333975424  com.myJob.flink.tupleState
   8:           201      784339688  [Lorg.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry;
   9:      17230300      689212000  com.myJob.flink.uniqStruct
  10:      14025040      561001600  java.util.ArrayList
  11:       8615581      413547888  com.myJob.flink.Data$FingerprintCnt
  12:       6142006      393088384  com.myJob.flink.ProcessCountStruct
  13:       4307549      172301960  com.myJob.flink.uniqresult
  14:       4307841      137850912  com.myJob.flink.Data$FingerprintUniq
  15:       2153904      137849856  com.myJob.flink.Data$StreamData
  16:       1984742       79389680  scala.collection.mutable.ListBuffer
  17:       1909472       61103104  scala.collection.immutable.$colon$colon
  18:         22200       21844392  [B
  19:        282624        9043968  org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
  20:         59045        6552856  [C
  21:         33194        2655520  java.nio.DirectByteBuffer
  22:         32804        2361888  sun.misc.Cleaner
  23:            35        2294600  [Lscala.concurrent.forkjoin.ForkJoinTask;
  24:           640        2276352  [Lorg.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
  25:         32768        2097152  org.apache.flink.core.memory.HybridMemorySegment
  26:         12291        2082448  java.lang.Class
  27:         58591        1874912  java.lang.String
  28:          8581        1372960  java.lang.reflect.Method
  29:         32790        1311600  java.nio.DirectByteBuffer$Deallocator
  30:         18537         889776  java.util.concurrent.ConcurrentHashMap$Node
  31:          4239         508680  java.lang.reflect.Field
  32:          8810         493360  java.nio.HeapByteBuffer
  33:          7389         472896  java.util.HashMap
  34:          5208         400336  [I

The tupple(Int, long) is com.myJob.flink.tupleState in 7th position. And I see the tuple use less than 2G of memory.

I don't understand why flink used this amount of memory for these classes.

Can anyone give me a light on this behavior, thanks in advance.


Update:

I run my job on a stand alone cluster (1 jobManager, 3 taskManager)

the flink version is 1.5-SNAPSHOT commit : e4486ae

I get the histo live on one taskManager node.


Update 2 :

In my processFunction I used :

ctx.timerService.registerProcessingTimeTimer(ctx.timestamp + 100)

And after on onTimer function, I process my listState to check all old data. so it create a timer for each call on processFunction.

but why the timer is steel on memory after onTimer function triggered

1
You might need to use a heap analyzer or a debugger to trace those objects back towards their GC roots.Seth Tisue
Did you run the flink job in a local mode (local cluster)? Or did you dump the task manager? Could you maybe post some source code (or maybe a link to a github repo)TobiSH
@SethTisue About using a heapAnaliser it's complicated because each JVM have 45 GB of heap size, and If I down the memory with 4 GB I can't have the same state.Lip
@TobiSH I update my post.Lip
It would appear that you have a lot of timers. And are you perhaps keeping Flink state for a long time before clearing it (or worse, never clearing it at all)? If you can share the code (or pseudo code) it will be easier to diagnose.David Anderson

1 Answers

0
votes

How many windows do you end up with? Based on the top two entries what are are seeing is the "timers" that are used by Flink to track when to clean up the window. For every key in the window you will end up with (key, endTimestamp) effectively in the timer state. If you have a very large number of windows (perhaps out of order time or delayed watermarking) or a very large number of keys in each window, those will each take up memory.

Note that even if you are using RocksDB state, the TimerService uses Heap memory so you have to watch out for that.