1
votes

I'm executing an Apache Flink program on a cluster of three nodes. One of these works as jobmanager and taskmanager too. The other two are just taskmanager.

When I start my program (I do it on jobmanager) I obtain the following error (after a minute of no-real-execution of the program):

java.lang.Exception: TaskManager was lost/killed: c4211322e77548b791c70d466c138a49 @ giordano-2-2-100-1 (dataPort=37904)
at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1131)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
at akka.actor.ActorCell.invoke(ActorCell.scala:486)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

where giordano-2-2-100-1 is the address of the job-task manager. I set number of Task Slots equal to the machine cores (2) and the heap memory accordingly to the available memory showed by meminfo.

During the execution (before the error appears) I watched cpu usage and I noted that the two core of the job-task manager are working (at least 50% each, even 100% for one of them sometimes) while the other two nodes (the task managers) are completely free with a cpu usage around 0%.

I set correctly rpc address of the jobmanager and filled correctly slaves file putting:

giordano-2-2-100-1
giordano-2-2-100-2
giordano-2-2-100-3

Moreover I used ping from the master node to verify if the other nodes are reachable and it's ok, and telnet from the task managers to verify if the job manager was reachable, also in this case everything is ok.

Honestly I have no more ideas about what I'm doing wrong...

Furthermore I tried to execute the program on my laptop (dual core) setting a single-node cluster with the same configuration of the real cluster and the same jar. In this case everything works perfectly so I'm quietly sure the problem is in the job manager.

P.S. On stack overflow I found this reply of the same problem:TaskManager loss/killed but I don't understand how to set a different garbage collector.

1
What is the parallelism with which you start the job? Can it be that you are running a CPU intensive job which is only executed on the machine on which the JobManager is running? Then this could block the heartbeats from being processed correctly. Try not starting a TM on the machine where the JM is running? Sharing the logs of the JM and the TMs could also help solving your problem.Till Rohrmann
I commented parallelism because I have some operators that operates at 2 and some at 1. I have executed more experiment. I noted that until a certain load the cluster is able to perform the task correctly (but using just one node) while if I add more load (duplicating operators on others data for example) the previous error is showed. I set the G1 garbage collector, no results. Continue...Akinn
I followed your hint configuring 1JM and just 2 TM and executed a relatively light program. It is executed correctly but, even in this case, just on one node (the n.3). Each core of that was busy on > 92% of cpu usage, memory about 80-85%. I attach the logs (stackOV_1JM_2TM_OK): dropbox.com/s/w6uvgmhdg2bom49/stackOV_1JM_2TM_OK.zip?dl=0Akinn
Then I executed an heavy version with many operators (the one that fails) in order to lead flink to distribute the load. In this case just the node 2 is occupied (one core at 90%, the other one at 30%). the job doesn't produce results and crash killing TM. Logs (stackOV_1JM_2TM_HEAVY_ERROR): dropbox.com/s/rbyhn1o8ama9we3/…Akinn
Is it a network intensive job? My thought is that maybe a lot of data is being requested and sent to the flink job constantly that things are getting backed up because you don't have enough bandwidth to the cluster to handle the volume.Jicaar

1 Answers

0
votes

This problem happened to me when a task manager runs out of memory and when the GC takes too much time trying to free some memory.

Try to use more Ram or decrease the memory requirements of your tasks.