1
votes

I'm trying to sort JavaPairRDD by Key.

Configuration

Spark version : 1.3.0 mode: local

Can some one look into my code where I'm doing wrong.

 JavaPairRDD<String, HashMap<String, Object>> countAndSum = grupBydate
                                        .reduceByKey(new Function2<HashMap<String, Object>, HashMap<String, Object>, HashMap<String, Object>>() {
                                            @Override
                                            public HashMap<String, Object> call(
                                                    HashMap<String, Object> v1,
                                                    HashMap<String, Object> v2)
                                                    throws Exception {
                                                long count = Long.parseLong(v1.get(
                                                        SparkToolConstant.COUNT)
                                                        .toString())
                                                        + Long.parseLong(v2
                                                                .get(SparkToolConstant.COUNT)
                                                                .toString());
                                                Double sum = Double.parseDouble(v1.get(
                                                        SparkToolConstant.VALUE)
                                                        .toString())
                                                        + Double.parseDouble(v2
                                                                .get(SparkToolConstant.VALUE)
                                                                .toString());
                                                HashMap<String, Object> sumMap = new HashMap<String, Object>();
                                                sumMap.put(SparkToolConstant.COUNT,
                                                        count);
                                                sumMap.put(SparkToolConstant.VALUE, sum);
                                                return sumMap;
                                            }
                                        });


System.out.println("count before sorting : "
                                        + countAndSum.count());



   /**
    sort by date 

    */
                                JavaPairRDD<String, HashMap<String, Object>> sortByDate = countAndSum
                                        .sortByKey(new Comparator<String>() {
                                            @Override
                                            public int compare(String dateStr1,
                                                    String dateStr2) {
                                                DateUtil dateUtil = new DateUtil();
                                                Date date1 = dateUtil.stringToDate(
                                                        dateStr1, dateFormat);
                                                Date date2 = dateUtil.stringToDate(
                                                        dateStr2, dateFormat);
                                                if (date2 == null && date1 == null) {
                                                    return 0;
                                                } else if (date2 != null
                                                        && date1 != null) {
                                                    return date1.compareTo(date2);
                                                } else if (date2 == null) {
                                                    return 1;
                                                } else {
                                                    return -1;
                                                }
                                            }
                                        });

Getting Error here

                        System.out.println("count after sorting : "
                                + sortByDate.count());

Stack trace when Task submit in Spark using spark-submit as a local mode

SchedulerImpl:59 - Cancelling stage 252 2015-04-29 14:37:19 INFO DAGScheduler:59 - Job 62 failed: count at DataValidation.java:378, took 0.107696 s Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.reflect.InvocationTargetException sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:240) org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:150) org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99) org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158) org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99) org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:58) org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:39) org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:835) org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1042) org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply(DAGScheduler.scala:1039) org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply(DAGScheduler.scala:1039) scala.Option.foreach(Option.scala:236) org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15.apply(DAGScheduler.scala:1039) org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15.apply(DAGScheduler.scala:1038) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1038) org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1390) org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:847) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1042) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply(DAGScheduler.scala:1039) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply(DAGScheduler.scala:1039) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15.apply(DAGScheduler.scala:1039) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15.apply(DAGScheduler.scala:1038) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1038) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1390) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Blockquote

1
please format your question correctly, writing a question like this is rude!eliasah

1 Answers

0
votes

Spark would serialize the function you passed in reduceByKey and sorByKey first and pass them to executors. Therefore, you should guarantee that your functions are serializable there

SparkToolConstant & DateUtil in your code seems like the reason causes this error.