0
votes

Following is my simple code. When I run it in Spark Local mode it runs perfectly. But when I Try to run it in cluster mode with 1 driver and 1 worker it gives me following exception.

I have tried setJars which is mentioned in some answers but it hasn't helped me.

public static void main(String[] args) throws IOException {

        SparkConf conf = new SparkConf().setAppName("example.ClusterPractice").setMaster("spark://192.168.42.18:7077");
        conf.setJars(new String[]{"E:\\Eclipses\\neon new projects\\eclipse\\neon new projects\\spark-practice\\out\\artifacts\\spark_practice_jar\\spark-practice.jar"});

        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<Integer> numbers = sc.parallelize(Arrays.asList(1, 2, 3));

        System.out.println("Reduce");
        long total = numbers.reduce((n1,n2)-> n1+n2);
        System.out.println(total);
    }

Exception I am getting is as follows :

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2131) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1029) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.reduce(RDD.scala:1011) at org.apache.spark.api.java.JavaRDDLike$class.reduce(JavaRDDLike.scala:385) at org.apache.spark.api.java.AbstractJavaRDDLike.reduce(JavaRDDLike.scala:45) at example.ClusterPractice.main(ClusterPractice.java:22) Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.fun$2 of type org.apache.spark.api.java.function.Function2 in instance of org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1 at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)

2

2 Answers

3
votes

You can find detailed answer to your question here

It seems you are removing the jars that has been set using

conf.setJars(new String[]{"E:\\Eclipses\\neon new projects\\eclipse\\neon new projects\\spark-practice\\out\\artifacts\\spark_practice_jar\\spark-practice.jar"});

from the configuration with this line

conf.setJars(new String[]{""});

Remove this line and it will work.

0
votes

Above program works perfectly.

The issue was in building the jar. So don't doubt the program just focus on whether jar is getting built properly or not.

In my case, I am using Intellij. I was doing build artifact from build option and I think due to it jar was not getting built properly as it is maven project.

So, when I did maven build jar got built properly and program ran smoothly.