I'm trying to sort JavaPairRDD by Key.
Configuration
Spark version : 1.3.0 mode: local
Can some one look into my code where I'm doing wrong.
JavaPairRDD<String, HashMap<String, Object>> countAndSum = grupBydate
.reduceByKey(new Function2<HashMap<String, Object>, HashMap<String, Object>, HashMap<String, Object>>() {
@Override
public HashMap<String, Object> call(
HashMap<String, Object> v1,
HashMap<String, Object> v2)
throws Exception {
long count = Long.parseLong(v1.get(
SparkToolConstant.COUNT)
.toString())
+ Long.parseLong(v2
.get(SparkToolConstant.COUNT)
.toString());
Double sum = Double.parseDouble(v1.get(
SparkToolConstant.VALUE)
.toString())
+ Double.parseDouble(v2
.get(SparkToolConstant.VALUE)
.toString());
HashMap<String, Object> sumMap = new HashMap<String, Object>();
sumMap.put(SparkToolConstant.COUNT,
count);
sumMap.put(SparkToolConstant.VALUE, sum);
return sumMap;
}
});
System.out.println("count before sorting : "
+ countAndSum.count());
/**
sort by date
*/
JavaPairRDD<String, HashMap<String, Object>> sortByDate = countAndSum
.sortByKey(new Comparator<String>() {
@Override
public int compare(String dateStr1,
String dateStr2) {
DateUtil dateUtil = new DateUtil();
Date date1 = dateUtil.stringToDate(
dateStr1, dateFormat);
Date date2 = dateUtil.stringToDate(
dateStr2, dateFormat);
if (date2 == null && date1 == null) {
return 0;
} else if (date2 != null
&& date1 != null) {
return date1.compareTo(date2);
} else if (date2 == null) {
return 1;
} else {
return -1;
}
}
});
Getting Error here
System.out.println("count after sorting : "
+ sortByDate.count());
Stack trace when Task submit in Spark using spark-submit as a local mode
SchedulerImpl:59 - Cancelling stage 252 2015-04-29 14:37:19 INFO DAGScheduler:59 - Job 62 failed: count at DataValidation.java:378, took 0.107696 s Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.reflect.InvocationTargetException sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:240) org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:150) org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99) org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:158) org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:99) org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:58) org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:39) org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:835) org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1042) org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply(DAGScheduler.scala:1039) org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply(DAGScheduler.scala:1039) scala.Option.foreach(Option.scala:236) org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15.apply(DAGScheduler.scala:1039) org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15.apply(DAGScheduler.scala:1038) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1038) org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1390) org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:847) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1042) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply(DAGScheduler.scala:1039) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15$$anonfun$apply$1.apply(DAGScheduler.scala:1039) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15.apply(DAGScheduler.scala:1039) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$15.apply(DAGScheduler.scala:1038) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1038) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1390) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Blockquote