2
votes

I'm writing a Spark Streaming app in Scala. The goal of the app is to consume the latest records from Kafka and print them to stdout.

The app works perfectly when I run it locally using --master local[n]. However, when I run the app in YARN (and produce to the topic that I am consuming from), the app gets stuck at:

16/11/18 20:53:05 INFO JobScheduler: Added jobs for time 1479502385000 ms

After repeating the line above several times, Spark gives the following error:

16/11/18 20:54:47 WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 9, r3d3.hadoop.REDACTED.REDACTED): java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:454)
at sun.nio.ch.Net.connect(Net.java:446)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at org.apache.spark.util.NextIterator.to(NextIterator.scala:21)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at org.apache.spark.util.NextIterator.toBuffer(NextIterator.scala:21)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at org.apache.spark.util.NextIterator.toArray(NextIterator.scala:21)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Error from the streaming UI:

org.apache.spark.streaming.dstream.DStream.print(DStream.scala:757)
com.REDACTED.bdp.Main$.main(Main.scala:88)
com.REDACTED.bdp.Main.main(Main.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Errors from YARN application logs (stdout):

java.lang.NullPointerException
        at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158)
        at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66)
        at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
        at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
        at org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:60)
        at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:79)
        at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:77)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:77)
        at org.apache.spark.scheduler.Task.run(Task.scala:91)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
[2016-11-21 15:57:49,925] ERROR Exception in task 0.1 in stage 33.0 (TID 34) (org.apache.spark.executor.Executor)
org.apache.spark.util.TaskCompletionListenerException
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:91)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Another error from YARN application logs:

[2016-11-21 15:52:32,264] WARN Exception encountered while connecting to the server :  (org.apache.hadoop.ipc.Client)
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby
        at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:375)
        at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:558)
        at org.apache.hadoop.ipc.Client$Connection.access$1800(Client.java:373)
        at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:727)
        at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:723)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
        at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:722)
        at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:373)
        at org.apache.hadoop.ipc.Client.getConnection(Client.java:1493)
        at org.apache.hadoop.ipc.Client.call(Client.java:1397)
        at org.apache.hadoop.ipc.Client.call(Client.java:1358)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
        at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:252)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
        at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
        at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2116)
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1315)
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1311)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1311)
        at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1424)
        at org.apache.spark.deploy.yarn.Client$.org$apache$spark$deploy$yarn$Client$$sparkJar(Client.scala:1195)
        at org.apache.spark.deploy.yarn.Client$.populateClasspath(Client.scala:1333)
        at org.apache.spark.deploy.yarn.ExecutorRunnable.prepareEnvironment(ExecutorRunnable.scala:290)
        at org.apache.spark.deploy.yarn.ExecutorRunnable.env$lzycompute(ExecutorRunnable.scala:61)
        at org.apache.spark.deploy.yarn.ExecutorRunnable.env(ExecutorRunnable.scala:61)
        at org.apache.spark.deploy.yarn.ExecutorRunnable.startContainer(ExecutorRunnable.scala:80)
        at org.apache.spark.deploy.yarn.ExecutorRunnable.run(ExecutorRunnable.scala:68)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

The weird part is that about 5% of the time, the app reads from Kafka successfully, for whatever reason.

The cluster and YARN seem to be working properly. The cluster is secured using Kerberos.

What might be the source of this error?

1
5% of the time, the app reads from Kafka successfully and The cluster is secured using Kerberos makes me think about the time between these events. Could that be that after 5 mins your authentication token expires and that's when your streaming jobs start to fail? (never worked with kerberized/secured Spark clusters). r3d3.hadoop.REDACTED.REDACTED is a host with a Spark executor, isn't it? Can you paste the Streaming tab from web UI from the beginning to the first failure?Jacek Laskowski
Look at the YARN logs to see what exactly is happening to the executors >> locate the YARN job ID in the Spark driver logs (sthg like application_xxxx_xxxxxxxx) and use it to search the YARN UI -- or use command-line yarn status <id> ; yarn logs -applicationId <id>Samson Scharfrichter
My job fails immediately after starting if I produce to the Kafka topic it is trying to read from.dqian96

1 Answers

0
votes

tl;dr The answer does not offer an answer and merely suggests a possible next step.

My understanding of when the Lost task event could be reported for a streaming job is when the job was executed and it could not finish which in your case is the connection issue between a Spark executor and a Kafka broker.

16/11/18 20:54:47 WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 9, r3d3.hadoop.REDACTED.REDACTED): java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:454)
at sun.nio.ch.Net.connect(Net.java:446)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)

The pattern of the error message is as follows:

Lost task [id] in stage [taskSetId] (TID [tid], [host], executor [executorId]): [reason]

that translates to your case as having the Spark executor running on host r3d3.hadoop.REDACTED.REDACTED.

The reason for the failure is what follows which says:

java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:454)
at sun.nio.ch.Net.connect(Net.java:446)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)

And I would ask myself when could a Kafka broker be unavailable for a client (which in your case is a Spark Streaming application which may or may not contribute to understand the root cause of the issue).

I think it might be unrelated to Apache Spark and would look for more answers in Kafka circles.