11
votes

The exception message as following

User class threw exception: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 11, 10.215.155.82): java.lang.NullPointerException at org.joda.time.tz.CachedDateTimeZone.getInfo(CachedDateTimeZone.java:143) at org.joda.time.tz.CachedDateTimeZone.getOffset(CachedDateTimeZone.java:103) at org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:676) at org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:521) at org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:625) at org.joda.time.base.AbstractDateTime.toString(AbstractDateTime.java:328) at com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3$$anonfun$apply$1.apply(DateTimeNullReferenceReappear.scala:41) at com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3$$anonfun$apply$1.apply(DateTimeNullReferenceReappear.scala:41) at scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:328) at scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:113) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:28) at scala.collection.TraversableLike$class.groupBy(TraversableLike.scala:327) at org.apache.spark.util.collection.CompactBuffer.groupBy(CompactBuffer.scala:28) at com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3.apply(DateTimeNullReferenceReappear.scala:41) at com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3.apply(DateTimeNullReferenceReappear.scala:40) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:1081) at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:1081) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)

My code as following:

import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.{ SparkConf, SparkContext }
import org.joda.time.DateTime
import org.joda.time.format.{ DateTimeFormat, DateTimeFormatter }




object DateTimeNullReferenceReappear extends App {

  case class Record(uin: String = "", date: DateTime = null, value: Double = 0.0) 

  val cfg = new Configuration
  val sparkConf = new SparkConf()
  sparkConf.setAppName("bourne_exception_reappear")
  val sc = new SparkContext(sparkConf)

val data = TDWSparkContext.tdwTable(   // this function just read data from an data warehouse
  sc,
  tdwuser = FaceConf.TDW_USER,
  tdwpasswd = FaceConf.TDW_PASSWORD,
  dbName = "my_db",
  tblName = "my_table",
  parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329"))
  .map(row => {
    Record(uin = row(2),
      date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)),
      value = row(4).toDouble)
  }).map(x => (x.uin, (x.date, x.value)))
  .groupByKey
  .map(x => {
    x._2.groupBy(_._1.toString("yyyyMMdd")).mapValues(_.map(_._2).sum)   // throw exception here
  })

//      val data = TDWSparkContext.tdwTable(  // It works, as I don't user datetime toString in the groupBy 
//      sc,
//      tdwuser = FaceConf.TDW_USER,
//      tdwpasswd = FaceConf.TDW_PASSWORD,
//      dbName = "hy",
//      tblName = "t_dw_cf_oss_tblogin",
//      parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329"))
//      .map(row => {
//        Record(uin = row(2),
//          date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)),
//          value = row(4).toDouble)
//      }).map(x => (x.uin, (x.date.toString("yyyyMMdd"), x.value)))
//      .groupByKey
//      .map(x => {
//        x._2.groupBy(_._1).mapValues(_.map(_._2).sum)
//      })

  data.take(10).map(println)

}

So, it seems that call toString in the groupBy cause the exception, so can anybody explain it?

Thanks

6
Well... NullPointerException occurs when you try to call any function on a variable that holds a null values of any type. So... this means that in your x._2 there is some tuple in which the first member ( _._1 ) is null.sarveshseri
Could you add the result of this? TDWSparkContext.tdwTable( // this function just read data from an data warehouse sc, tdwuser = FaceConf.TDW_USER, tdwpasswd = FaceConf.TDW_PASSWORD, dbName = "my_db", tblName = "my_table", parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329")) .map(row => { Record(uin = row(2), date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)), value = row(4).toDouble) }).map(x => (x.uin, (x.date, x.value)))Carlos Vilchez
Just replace this groupBy(_._1.toString("yyyyMMdd")) by groupBy( d => { if ( d._1 != null ) { d._1.toString("yyyyMMdd") } else { "I am a placeholder" } }). You can choose to do whatever you want with your place holders.sarveshseri
@SarveshKumarSingh map(x => (x.uin, (x.date.toString("yyyyMMdd"), x.value))) this does not throw exception, is that RDD.map catch the null pointer exception, but Array.groupBy does not?bourneli
Well... Yes. Spark performs a lot of cleaning on the mapping function to wrap it in a "safe" closure before actually serializing it and mapping the RDD. The cleaning function is - def clean(func: AnyRef, checkSerializable: Boolean = true) defined in ClosureCleaner.scala ( github.com/apache/spark/blob/master/core/src/main/scala/org/… ). I am not really sure... but may be that also safeguards against NullPointerExceptions and just generates a null value for such cases.sarveshseri

6 Answers

12
votes

You need to either disable Kryo, use Kryo JodaTime Serializers, or avoid serializing the DateTime object, i.e. pass around Longs.

1
votes

We don't know much about the "problem". So we can try following experimat which will let us see more about the problem.

Replace the following part,

map(x => {
  x._2.groupBy(_._1.toString("yyyyMMdd")).mapValues(_.map(_._2).sum)   // throw exception here
})

With this,

map( x => {
  x._2.groupBy( t => {
    val dateStringTry = Try( t._2.toString( "yyyyMMdd" ) )
    dateStringTry match {
      case Success( dateString ) => Right( dateString )
      case Failure( e ) => {
        println( "=========== Null Tuple Description ==========" )
        println( "Problem Tuple :: [" + t + "]" )
        println( "Error Info :: [" + e.getMessage + "]" )
        // finally the stack trace, if needed
        // e.printStackTrace()
        prinln( "=============================================" )
        Left( e )
      }
    }
  } )
} )

Let's check the result of running this experiment.

1
votes

The issue seems to be that DateTime looses something when serialising in Spark (which happens a lot there I guess). In my case the Chronology was messed up which caused the same exception.

One really very hacky workaround which worked for me is to recreate the DateTime just before using it, e.g.:

date.toMutableDateTime.toDateTime

This seems to restore whatever missing bits and everything is working after that.

The solution posted by Marius Soutier to disable Kryo also worked for me. This is a less hacky approach.

1
votes

The problem here is bad serialization of Joda's CachedDateTimeZone - it includes a transient field that doesn't get serialized, remaining null in the deserialized object.

You can create and register your own Serializer that handles this object properly:

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.joda.time.DateTimeZone;
import org.joda.time.tz.CachedDateTimeZone;

public class JodaCachedDateTimeZoneSerializer extends Serializer<CachedDateTimeZone> {

    public JodaCachedDateTimeZoneSerializer() {
        setImmutable(true);
    }

    @Override
    public CachedDateTimeZone read(final Kryo kryo, final Input input, final Class<CachedDateTimeZone> type) {
        // reconstruct from serialized ID:
        final String id = input.readString();
        return CachedDateTimeZone.forZone(DateTimeZone.forID(id));
    }

    @Override
    public void write(final Kryo kryo, final Output output, final CachedDateTimeZone cached) {
        // serialize ID only:
        output.writeString(cached.getID());
    }
}

Then, in your class extending KryoRegistrator, add:

kryo.register(classOf[CachedDateTimeZone], new JodaCachedDateTimeZoneSerializer())

This way you don't have to disable Kryo or refrain from using Joda.

1
votes
sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");
0
votes

Please refer to this -- https://issues.apache.org/jira/browse/SPARK-4170

Basically, you shouldn't be extending scala.App for your main class. It may not work correctly in some cases. Use an explicit main() method instead.

Here's the documented warning in the Spark 1.6.1 code (In SparkSubmit class)

// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
  printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}