Since the last couple week I build a DataStream programs in Flink in scala. But I have a strange behavior, flink uses lots of more memory than I expected.
I have a 4 ListState of tuple(Int, long) in my processFunction keyed by INT, I use it to get different unique Counter in a different time frame, and I expected the most of the memory was used by this List.
But it's not the case. So I print an histo live of the JVM. And I was surprised how many memories are used.
num #instances #bytes class name
----------------------------------------------
1: 138920685 6668192880 java.util.HashMap$Node
2: 138893041 5555721640 org.apache.flink.streaming.api.operators.InternalTimer
3: 149680624 3592334976 java.lang.Integer
4: 48313229 3092046656 org.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry
5: 14042723 2579684280 [Ljava.lang.Object;
6: 4492 2047983264 [Ljava.util.HashMap$Node;
7: 41686732 1333975424 com.myJob.flink.tupleState
8: 201 784339688 [Lorg.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry;
9: 17230300 689212000 com.myJob.flink.uniqStruct
10: 14025040 561001600 java.util.ArrayList
11: 8615581 413547888 com.myJob.flink.Data$FingerprintCnt
12: 6142006 393088384 com.myJob.flink.ProcessCountStruct
13: 4307549 172301960 com.myJob.flink.uniqresult
14: 4307841 137850912 com.myJob.flink.Data$FingerprintUniq
15: 2153904 137849856 com.myJob.flink.Data$StreamData
16: 1984742 79389680 scala.collection.mutable.ListBuffer
17: 1909472 61103104 scala.collection.immutable.$colon$colon
18: 22200 21844392 [B
19: 282624 9043968 org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
20: 59045 6552856 [C
21: 33194 2655520 java.nio.DirectByteBuffer
22: 32804 2361888 sun.misc.Cleaner
23: 35 2294600 [Lscala.concurrent.forkjoin.ForkJoinTask;
24: 640 2276352 [Lorg.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
25: 32768 2097152 org.apache.flink.core.memory.HybridMemorySegment
26: 12291 2082448 java.lang.Class
27: 58591 1874912 java.lang.String
28: 8581 1372960 java.lang.reflect.Method
29: 32790 1311600 java.nio.DirectByteBuffer$Deallocator
30: 18537 889776 java.util.concurrent.ConcurrentHashMap$Node
31: 4239 508680 java.lang.reflect.Field
32: 8810 493360 java.nio.HeapByteBuffer
33: 7389 472896 java.util.HashMap
34: 5208 400336 [I
The tupple(Int, long) is com.myJob.flink.tupleState in 7th position. And I see the tuple use less than 2G of memory.
I don't understand why flink used this amount of memory for these classes.
Can anyone give me a light on this behavior, thanks in advance.
Update:
I run my job on a stand alone cluster (1 jobManager, 3 taskManager)
the flink version is 1.5-SNAPSHOT commit : e4486ae
I get the histo live on one taskManager node.
Update 2 :
In my processFunction I used :
ctx.timerService.registerProcessingTimeTimer(ctx.timestamp + 100)
And after on onTimer
function, I process my listState
to check all old data.
so it create a timer for each call on processFunction.
but why the timer is steel on memory after onTimer
function triggered