I have a flink job which is reading data from Kafka, performing certain aggregations and writing results to elasticsearch indexes. I am seeing High backpressure on the source. The high backpressure results in data being read slowly from Kafka, i see see data queued up in the network stack (netstat RecvQ shows tens of thousands of bytes of data stuck in the source kafka connections, the data is eventually read) which in turns causes the data to be sinked into elasticsearch after a lag and this lag keeps on increasing.
The source is producing ~17,500 records per minute, the Flink job assigns (event) timestamps to each incoming record, does 12 different types of keyBy, fits events in a 1 minute tumbling window, performs aggregation operations on this keyed windows stream and finally writes the results to 12 different elasticsearch indexes (each write is an insert).
The problem is that data being written to elasticsearch starts lagging so the dashboard results (built on top of elasticsearch) are no longer realtime. My understanding is that this is happening because of the backpressure build up. Not sure how to address this. The cluster itself is a VM based single node standalone cluster, with 64GB RAM (task manager is configured to use 20GB) and 16 vCPUs. There is no evidence (as seen from htop) of CPU or memory being constrained. There is only one task manager, and this is the only flink job on this cluster.
I am not sure if this problem is due to some local resource issue at the cluster or due to write to elasticsearch being slow. I have set the setBulkFlushMaxActions to 1 (as is done in all code examples i have seen anywhere), do i need to set setBulkFlushInterval and/or setBulkFlushMaxSizeinMB also?
I have gone through https://www.da-platform.com/flink-forward-berlin/resources/improving-throughput-and-latency-with-flinks-network-stack but have not yet tried tuning options listed on slide 19, not sure what values to set for those parameters.
Finally, i dont think i see the same issue when running the same job from within IntelliJ IDE.
I am going to exclude out all aggregations and then add them back one by one to see if there is a particular aggregation that triggers this issue?
Any specific pointers would be much appreciated, will also try the setBulkFlushInterval and setBulkFlushMaxSizeinMB.
Update 1, 01/29/2019 Seems like both nodes are running at very high heap usage so the GC is constantly running trying to clear space in the JVM. Would be getting the physical memory increased from 16 to 32GB and then restart the nodes. That should hopefully resolve the proble, will know in another 24hours.