1
votes
case class Varnish(ID: String, varnish_latency: Float)


val seq = sc.sequenceFile[LongWritable, BytesWritable](logfile_path)
val usableRDD = seq.map({case (_, v : BytesWritable) => Text.decode(v.getBytes)})
                   .map(_.split(" "))
                   .map(p => Varnish(p(11), p(8).toFloat))
                   .toDF()
usableRDD.registerTempTable("Varnish")
sqlContext.sql("SELECT * from Varnish LIMIT 5").collect().foreach(println) // works fine
val countResult = sqlContext.sql("SELECT COUNT(*) FROM Varnish").collect() // throws Err
val cnt2 = countResult.head.getLong(0)

16/01/23 02:56:18 sparkDriver-akka.actor.default-dispatcher-20 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/01/23 02:56:18 Thread-3 INFO ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 57 in stage 1.0 failed 4 times, most recent failure:
Lost task 57.3 in stage 1.0 (TID 89, 10.1.201.14): java.lang.NumberFormatException: For input string: "nan"
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1250)

1

1 Answers

4
votes

Exception seems to be rather self-explanatory. Some of the values you pass contains nan string which is not interpreted as a valid Float representation:

scala> "nan".toFloat
java.lang.NumberFormatException: For input string: "nan"
...

As long as data doesn't come from the source which has been already validated (like RDBMS or Parquet files) you should never blindly trust it has a correct format. You can modify your code to properly handle this case and other malformed entries by using options:

import scala.util.Try

case class Varnish(ID: String, varnish_latency: Option[Float])

...
  .map(p => Varnish(p(11), Try(p(8).toFloat).toOption))

drop case class and handle this using SQL:

...
  .map(p => Varnish(p(11), p(8)))
  .toDF("ID", "varnish_latency")
  .withColumn("varnish_latency", $"varnish_latency".cast("double"))

or pre-validate before you call .toFloat and drop malformed entries.

The first two options will convert Nones to nulls. Since it is not semantically precise (original not-a-number vs missing value) and result in a loss of information you may prefer handling "nan" case explicitly. It can be done for example by replacing "nan" with "NaN" (correct representation) before calling toFloat or pattern matching:

p(8) match {
  case "nan" => Float.NaN
  case s => s.toFloat
}