1
votes

I created a HA Flink v1.2 cluster made up of 1 JobManager and 2 TaskManagers each in its own VM (not using YARN or hdfs). After I start a job on the JobManager node I kill one TaskManager instance. Immediately in the Web Dashboard I can see the job being cancelled and then failing. If I check the logs:

03/06/2017 16:23:50 Flat Map(1/2) switched to DEPLOYING 
03/06/2017 16:23:50 Flat Map(2/2) switched to SCHEDULED 
03/06/2017 16:23:50 Flat Map(2/2) switched to DEPLOYING 
03/06/2017 16:23:50 Flat Map(1/2) switched to RUNNING 
03/06/2017 16:23:50 Source: Custom Source -> Flat Map(1/2) switched to RUNNING 
03/06/2017 16:23:50 Flat Map(2/2) switched to RUNNING 
03/06/2017 16:23:50 Source: Custom Source -> Flat Map(2/2) switched to RUNNING 
03/06/2017 16:25:38 Flat Map(1/2) switched to FAILED 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'ip-10-106-0-238/10.106.0.238:40578'. This might indicate that the remote task manager was lost.
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:118)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
    at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:610)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)

03/06/2017 16:25:38 Job execution switched to status FAILING.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'ip-10-106-0-238/10.106.0.238:40578'. This might indicate that the remote task manager was lost.
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:118)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
    at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:610)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
03/06/2017 16:25:38 Source: Custom Source -> Flat Map(1/2) switched to CANCELING 
03/06/2017 16:25:38 Source: Custom Source -> Flat Map(2/2) switched to CANCELING 
03/06/2017 16:25:38 Flat Map(2/2) switched to CANCELING 
03/06/2017 16:25:38 Source: Custom Source -> Flat Map(1/2) switched to CANCELED 
03/06/2017 16:26:18 Source: Custom Source -> Flat Map(2/2) switched to CANCELED 
03/06/2017 16:26:18 Flat Map(2/2) switched to CANCELED 

In the job implementation I have

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, // number
                                                                // of
                                                                // restart
                                                                // attempts
        Time.of(10, TimeUnit.SECONDS) // delay
));

My question is shouldn't the JobManager automatically redirect all requests to the remaining / running TaskManager? Similarly if I start the JobManager and 1 TaskManager instance, and run a job, when I start the 2nd TaskManager instance should it also contribute to solve the running job?

Thanks!

1

1 Answers

1
votes

First of all the RestartStrategy has nothing to do with HA mode. High-availability concerns the availability of JobManager. Anyway for HA to be working at least two instances of JobManagers are required(you said you are starting just one).

As for the RestartStrategy when you specify fixedDelayRestart strategy after a fail (as in your case when for example kill TaskManager) the job will be tried to run once again (in your case after 10 seconds). If it is not the case in your installation you are probably missing available resources for the job to be run (I suppose you have 1 task slot per TaskManager so when just one is left you can't run a job with parallelism 2 or more).

For the last question adding a TaskManager does not contribute to running jobs. Somehow connected behaviour is called dynamic scaling. You can do it by taking a savepoint and then rerunning it with more resources. Have a look here. Automatic rescaling is work in progress.