0
votes

I am new to flink and i deployed my flink application which basically perform simple pattern matching. It is deployed in Kubernetes cluster with 1 JM and 6 TM. I am sending messages of size 4.4k and 200k messages every 10 min to eventhub topic and performing load testing. I added restart strategy and checking pointing as below and i am not explicitly using any states in my code as there is no requirement for it

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 // start a checkpoint every 1000 ms
 env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE);
 // advanced options:
 // make sure 500 ms of progress happen between checkpoints
 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
 // checkpoints have to complete within one minute, or are discarded
 env.getCheckpointConfig().setCheckpointTimeout(120000);
 // allow only one checkpoint to be in progress at the same time
 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
 // enable externalized checkpoints which are retained after job cancellation
 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 // allow job recovery fallback to checkpoint when there is a more recent savepoint
 env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
         5, // number of restart attempts
         Time.of(5, TimeUnit.MINUTES) // delay
 ));

Initially i was facing Netty server issue with network buffer and i followed this link https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-network-memory-floating-buffers-per-gate flink network and heap memory optimizations and applied below settings and everything is working fine

taskmanager.network.memory.min: 256mb
taskmanager.network.memory.max: 1024mb
taskmanager.network.memory.buffers-per-channel: 8
taskmanager.memory.segment-size: 2mb
taskmanager.network.memory.floating-buffers-per-gate: 16
cluster.evenly-spread-out-slots: true
taskmanager.heap.size: 1024m
taskmanager.memory.framework.heap.size: 64mb
taskmanager.memory.managed.fraction: 0.7
taskmanager.memory.framework.off-heap.size: 64mb
taskmanager.memory.network.fraction: 0.4
taskmanager.memory.jvm-overhead.min: 256mb
taskmanager.memory.jvm-overhead.max: 1gb
taskmanager.memory.jvm-overhead.fraction: 0.4

But i have two below questions

  1. If any task manager restarts because of any failures the task manager is restarting successfully and getting registered with job manager but after the restarted task manager don't perform any processing of data it will sit idle. Is this normal flink behavior or do i need to add any setting to make task manager to start processing again.

  2. Sorry and correct me if my understanding is wrong, flink has a restart strategy in my code i made limit 5 attempts of restart. What will happen if my flink job is not successfully overcomes the task failure entire flink job will be remained in idle state and i have to restart job manually or is there any mechanism i can add to restart my job even after it crossed the limit of restart job attempts.

  3. Is there any document to calculate the number of cores and memory i should assign to flink job cluster based on data size and rate at which my system receives the data ?

  4. Is there any documentation on flink CEP optimization techniques?

  5. This is the error stack trace i am seeing in job manager

  1. I am seeing the below errors in my job manager logs before the pattern matching

    Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager '/10.244.9.163:46377'. This might indicate that the remote task manager was lost. at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:136) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:393) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:358) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1416) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:912) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:816) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:748)

Thanks in advance, please help me in resolving my doubts

1

1 Answers

2
votes

Various points:

If your patterns involve matching temporal sequences (e.g., "A followed by B"), then you need state to do this. Most of Flink's sources and sinks also use state internally to record offsets, etc., and this state needs to be checkpointed if you care about exactly-once guarantees. If the patterns are being streamed in dynamically, then you'll want to store the patterns in Flink state as well.

Some of the comments in the code don't match the configuration parameters: e.g., "500 ms of progress" vs. 1000, "checkpoints have to complete within one minute" vs 120000. Also, keep in mind that the section of the documentation that you copied these settings from is not recommending best practices, but is instead illustrating how to make changes. In particular, env.getCheckpointConfig().setPreferCheckpointForRecovery(true); is a bad idea, and that config option should probably not exist.

Some of your entries in config.yaml are concerning. taskmanager.memory.managed.fraction is rather large (0.7) -- this only makes sense if you are using RocksDB, since managed memory has no other purpose for streaming. And taskmanager.memory.network.fraction and taskmanager.memory.jvm-overhead.fraction are both very large, and the sum of these three fractions is 1.5, which doesn't make sense.

In general the default network configuration works well across a wide range of deployment scenarios, and it is unusual to need to tune these settings, except in large clusters (which is not the case here). What sort of problems did you encounter?

As for your questions:

  1. After a TM failure and recovery, the TMs should automatically resume processing from the most recent checkpoint. To diagnose why this isn't happening, we'll need more information. To gain experience with a deployment that handles this correctly, you can experiment with the Flink Operations Playground.

  2. Once the configured restart strategy has played itself out, the job will FAIL, and Flink will no longer try to recover that job. You can, of course, build your own automation on top of Flink's REST API, if you want something more sophisticated.

  3. Documentation on capacity planning? No, not really. This is generally figured out through trial and error. Different applications tend to have different requirements in ways that are difficult to anticipate. Things like your choice of serializer, state backend, number of keyBys, the sources and sinks, key skew, watermarking, and so on can all have significant impacts.

  4. Documentation on optimizing CEP? No, sorry. The main points are

    • do everything you can to constrain the matches; avoid patterns that must keep state indefinitely
    • getEventsForPattern can be expensive