I am new to flink and i deployed my flink application which basically perform simple pattern matching. It is deployed in Kubernetes cluster with 1 JM and 6 TM. I am sending messages of size 4.4k and 200k messages every 10 min to eventhub topic and performing load testing. I added restart strategy and checking pointing as below and i am not explicitly using any states in my code as there is no requirement for it
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE);
// advanced options:
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(120000);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// allow job recovery fallback to checkpoint when there is a more recent savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
5, // number of restart attempts
Time.of(5, TimeUnit.MINUTES) // delay
));
Initially i was facing Netty server issue with network buffer and i followed this link https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-network-memory-floating-buffers-per-gate flink network and heap memory optimizations and applied below settings and everything is working fine
taskmanager.network.memory.min: 256mb
taskmanager.network.memory.max: 1024mb
taskmanager.network.memory.buffers-per-channel: 8
taskmanager.memory.segment-size: 2mb
taskmanager.network.memory.floating-buffers-per-gate: 16
cluster.evenly-spread-out-slots: true
taskmanager.heap.size: 1024m
taskmanager.memory.framework.heap.size: 64mb
taskmanager.memory.managed.fraction: 0.7
taskmanager.memory.framework.off-heap.size: 64mb
taskmanager.memory.network.fraction: 0.4
taskmanager.memory.jvm-overhead.min: 256mb
taskmanager.memory.jvm-overhead.max: 1gb
taskmanager.memory.jvm-overhead.fraction: 0.4
But i have two below questions
If any task manager restarts because of any failures the task manager is restarting successfully and getting registered with job manager but after the restarted task manager don't perform any processing of data it will sit idle. Is this normal flink behavior or do i need to add any setting to make task manager to start processing again.
Sorry and correct me if my understanding is wrong, flink has a restart strategy in my code i made limit 5 attempts of restart. What will happen if my flink job is not successfully overcomes the task failure entire flink job will be remained in idle state and i have to restart job manually or is there any mechanism i can add to restart my job even after it crossed the limit of restart job attempts.
Is there any document to calculate the number of cores and memory i should assign to flink job cluster based on data size and rate at which my system receives the data ?
Is there any documentation on flink CEP optimization techniques?
This is the error stack trace i am seeing in job manager
I am seeing the below errors in my job manager logs before the pattern matching
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager '/10.244.9.163:46377'. This might indicate that the remote task manager was lost. at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:136) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:393) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:358) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1416) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:912) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:816) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:748)
Thanks in advance, please help me in resolving my doubts